diff --git a/Cargo.lock b/Cargo.lock index be3f179d5f..ba8c367d53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2393,6 +2393,17 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nostarve_queue" +version = "0.1.0" +dependencies = [ + "futures", + "rand", + "scopeguard", + "tokio", + "tracing", +] + [[package]] name = "notify" version = "5.2.0" @@ -2704,6 +2715,7 @@ dependencies = [ "itertools", "metrics", "nix 0.26.2", + "nostarve_queue", "num-traits", "num_cpus", "once_cell", diff --git a/Cargo.toml b/Cargo.toml index 2b9da977e5..ac3e3c273d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ members = [ "libs/tracing-utils", "libs/postgres_ffi/wal_craft", "libs/vm_monitor", + "libs/nostarve_queue", ] [workspace.package] @@ -180,6 +181,7 @@ tenant_size_model = { version = "0.1", path = "./libs/tenant_size_model/" } tracing-utils = { version = "0.1", path = "./libs/tracing-utils/" } utils = { version = "0.1", path = "./libs/utils/" } vm_monitor = { version = "0.1", path = "./libs/vm_monitor/" } +nostarve_queue = { path = "./libs/nostarve_queue" } ## Common library dependency workspace_hack = { version = "0.1", path = "./workspace_hack/" } diff --git a/libs/nostarve_queue/Cargo.toml b/libs/nostarve_queue/Cargo.toml new file mode 100644 index 0000000000..716762d477 --- /dev/null +++ b/libs/nostarve_queue/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "nostarve_queue" +version = "0.1.0" +edition.workspace = true +license.workspace = true + +[dependencies] +scopeguard.workspace = true +tracing.workspace = true + +[dev-dependencies] +futures.workspace = true +rand.workspace = true +tokio = { workspace = true, features = ["rt", "rt-multi-thread", "time"] } diff --git a/libs/nostarve_queue/src/lib.rs b/libs/nostarve_queue/src/lib.rs new file mode 100644 index 0000000000..885e77c45b --- /dev/null +++ b/libs/nostarve_queue/src/lib.rs @@ -0,0 +1,308 @@ +//! Synchronization primitive to prevent starvation among concurrent tasks that do the same work. + +use std::{ + collections::VecDeque, + fmt, + future::poll_fn, + sync::Mutex, + task::{Poll, Waker}, +}; + +pub struct Queue { + inner: Mutex>, +} + +struct Inner { + waiters: VecDeque, + free: VecDeque, + slots: Vec, Option)>>, +} + +#[derive(Clone, Copy)] +pub struct Position<'q, T> { + idx: usize, + queue: &'q Queue, +} + +impl fmt::Debug for Position<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Position").field("idx", &self.idx).finish() + } +} + +impl Inner { + #[cfg(not(test))] + #[inline] + fn integrity_check(&self) {} + + #[cfg(test)] + fn integrity_check(&self) { + use std::collections::HashSet; + let waiters = self.waiters.iter().copied().collect::>(); + let free = self.free.iter().copied().collect::>(); + for (slot_idx, slot) in self.slots.iter().enumerate() { + match slot { + None => { + assert!(!waiters.contains(&slot_idx)); + assert!(free.contains(&slot_idx)); + } + Some((None, None)) => { + assert!(waiters.contains(&slot_idx)); + assert!(!free.contains(&slot_idx)); + } + Some((Some(_), Some(_))) => { + assert!(!waiters.contains(&slot_idx)); + assert!(!free.contains(&slot_idx)); + } + Some((Some(_), None)) => { + assert!(waiters.contains(&slot_idx)); + assert!(!free.contains(&slot_idx)); + } + Some((None, Some(_))) => { + assert!(!waiters.contains(&slot_idx)); + assert!(!free.contains(&slot_idx)); + } + } + } + } +} + +impl Queue { + pub fn new(size: usize) -> Self { + Queue { + inner: Mutex::new(Inner { + waiters: VecDeque::new(), + free: (0..size).collect(), + slots: { + let mut v = Vec::with_capacity(size); + v.resize_with(size, || None); + v + }, + }), + } + } + pub fn begin(&self) -> Result, ()> { + tracing::trace!("get in line locking inner"); + let mut inner = self.inner.lock().unwrap(); + inner.integrity_check(); + let my_waitslot_idx = inner + .free + .pop_front() + .expect("can't happen, len(slots) = len(waiters"); + inner.waiters.push_back(my_waitslot_idx); + let prev = inner.slots[my_waitslot_idx].replace((None, None)); + assert!(prev.is_none()); + inner.integrity_check(); + Ok(Position { + idx: my_waitslot_idx, + queue: &self, + }) + } +} + +impl<'q, T> Position<'q, T> { + pub fn complete_and_wait(self, datum: T) -> impl std::future::Future + 'q { + tracing::trace!("found victim locking waiters"); + let mut inner = self.queue.inner.lock().unwrap(); + inner.integrity_check(); + let winner_idx = inner.waiters.pop_front().expect("we put ourselves in"); + tracing::trace!(winner_idx, "putting victim into next waiters slot"); + let winner_slot = inner.slots[winner_idx].as_mut().unwrap(); + let prev = winner_slot.1.replace(datum); + assert!( + prev.is_none(), + "ensure we didn't mess up this simple ring buffer structure" + ); + if let Some(waker) = winner_slot.0.take() { + tracing::trace!(winner_idx, "waking up winner"); + waker.wake() + } + inner.integrity_check(); + drop(inner); // the poll_fn locks it again + + let mut poll_num = 0; + let mut drop_guard = Some(scopeguard::guard((), |()| { + panic!("must not drop this future until Ready"); + })); + + // take the victim that was found by someone else + poll_fn(move |cx| { + let my_waitslot_idx = self.idx; + poll_num += 1; + tracing::trace!(poll_num, "poll_fn locking waiters"); + let mut inner = self.queue.inner.lock().unwrap(); + inner.integrity_check(); + let my_waitslot = inner.slots[self.idx].as_mut().unwrap(); + // assert!( + // poll_num <= 2, + // "once we place the waker in the slot, next wakeup should have a result: {}", + // my_waitslot.1.is_some() + // ); + if let Some(res) = my_waitslot.1.take() { + tracing::trace!(poll_num, "have cache slot"); + // above .take() resets the waiters slot to None + debug_assert!(my_waitslot.0.is_none()); + debug_assert!(my_waitslot.1.is_none()); + inner.slots[my_waitslot_idx] = None; + inner.free.push_back(my_waitslot_idx); + let _ = scopeguard::ScopeGuard::into_inner(drop_guard.take().unwrap()); + inner.integrity_check(); + return Poll::Ready(res); + } + // assert_eq!(poll_num, 1); + if !my_waitslot + .0 + .as_ref() + .map(|existing| cx.waker().will_wake(existing)) + .unwrap_or(false) + { + let prev = my_waitslot.0.replace(cx.waker().clone()); + tracing::trace!(poll_num, prev_is_some = prev.is_some(), "updating waker"); + } + inner.integrity_check(); + tracing::trace!(poll_num, "waiting to be woken up"); + Poll::Pending + }) + } +} + +#[cfg(test)] +mod test { + use std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + task::Poll, + time::Duration, + }; + + use rand::RngCore; + + #[tokio::test] + async fn in_order_completion_and_wait() { + let queue = super::Queue::new(2); + + let q1 = queue.begin().unwrap(); + let q2 = queue.begin().unwrap(); + + assert_eq!(q1.complete_and_wait(23).await, 23); + assert_eq!(q2.complete_and_wait(42).await, 42); + } + + #[tokio::test] + async fn out_of_order_completion_and_wait() { + let queue = super::Queue::new(2); + + let q1 = queue.begin().unwrap(); + let q2 = queue.begin().unwrap(); + + let mut q2compfut = q2.complete_and_wait(23); + + match futures::poll!(&mut q2compfut) { + Poll::Pending => {} + Poll::Ready(_) => panic!("should not be ready yet, it's queued after q1"), + } + + let q1res = q1.complete_and_wait(42).await; + assert_eq!(q1res, 23); + + let q2res = q2compfut.await; + assert_eq!(q2res, 42); + } + + #[tokio::test] + async fn in_order_completion_out_of_order_wait() { + let queue = super::Queue::new(2); + + let q1 = queue.begin().unwrap(); + let q2 = queue.begin().unwrap(); + + let mut q1compfut = q1.complete_and_wait(23); + + let mut q2compfut = q2.complete_and_wait(42); + + match futures::poll!(&mut q2compfut) { + Poll::Pending => { + unreachable!("q2 should be ready, it wasn't first but q1 is serviced already") + } + Poll::Ready(x) => assert_eq!(x, 42), + } + + assert_eq!(futures::poll!(&mut q1compfut), Poll::Ready(23)); + } + + #[tokio::test(flavor = "multi_thread")] + async fn stress() { + let ntasks = 8; + let queue_size = 8; + let queue = Arc::new(super::Queue::new(queue_size)); + + let stop = Arc::new(AtomicBool::new(false)); + + let mut tasks = vec![]; + for i in 0..ntasks { + let jh = tokio::spawn({ + let queue = Arc::clone(&queue); + let stop = Arc::clone(&stop); + async move { + while !stop.load(Ordering::Relaxed) { + let q = queue.begin().unwrap(); + for _ in 0..(rand::thread_rng().next_u32() % 10_000) { + std::hint::spin_loop(); + } + q.complete_and_wait(i).await; + tokio::task::yield_now().await; + } + } + }); + tasks.push(jh); + } + + tokio::time::sleep(Duration::from_secs(10)).await; + + stop.store(true, Ordering::Relaxed); + + for t in tasks { + t.await.unwrap(); + } + } + + #[test] + fn stress_two_runtimes_shared_queue() { + std::thread::scope(|s| { + let ntasks = 8; + let queue_size = 8; + let queue = Arc::new(super::Queue::new(queue_size)); + + let stop = Arc::new(AtomicBool::new(false)); + + for i in 0..ntasks { + s.spawn({ + let queue = Arc::clone(&queue); + let stop = Arc::clone(&stop); + move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async move { + while !stop.load(Ordering::Relaxed) { + let q = queue.begin().unwrap(); + for _ in 0..(rand::thread_rng().next_u32() % 10_000) { + std::hint::spin_loop(); + } + q.complete_and_wait(i).await; + tokio::task::yield_now().await; + } + }); + } + }); + } + + std::thread::sleep(Duration::from_secs(10)); + + stop.store(true, Ordering::Relaxed); + }); + } +} diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 3eb01003df..a40bd133fc 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -37,6 +37,7 @@ humantime-serde.workspace = true hyper.workspace = true itertools.workspace = true nix.workspace = true +nostarve_queue.workspace = true # hack to get the number of worker threads tokio uses num_cpus = { version = "1.15" } num-traits.workspace = true diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 3b114a7fd4..95c8a817c3 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -72,20 +72,18 @@ //! use std::{ - collections::{hash_map::Entry, HashMap, VecDeque, HashSet}, + collections::{hash_map::Entry, HashMap}, convert::TryInto, - future::poll_fn, sync::{ atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering}, - Arc, Mutex, Weak, + Arc, Weak, }, - task::{Poll, Waker}, time::Duration, }; use anyhow::Context; use once_cell::sync::OnceCell; -use tracing::{Instrument, instrument}; +use tracing::instrument; use utils::{ id::{TenantId, TimelineId}, lsn::Lsn, @@ -254,18 +252,10 @@ pub struct PageCache { /// This is interpreted modulo the page cache size. next_evict_slot: AtomicUsize, - find_victim_waiters: Mutex<( - VecDeque, - VecDeque, - Vec< - Option<( - Option, - Option<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>, - )>, - >, - )>, - size_metrics: &'static PageCacheSizeMetrics, + + find_victim_waiters: + nostarve_queue::Queue<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>, } struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit); @@ -879,62 +869,10 @@ impl PageCache { &'static self, _permit_witness: &PinnedSlotsPermit, ) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard)> { - let integrity_check = |guard: &std::sync::MutexGuard<( - VecDeque, - VecDeque, - Vec< - Option<( - Option, - Option<(usize, tokio::sync::RwLockWriteGuard)>, - )>, - >, - )>| { - let (waiters, free, slots) = &**guard; - let waiters = waiters.iter().copied().collect::>(); - let free = free.iter().copied().collect::>(); - for (slot_idx, slot) in slots.iter().enumerate() { - match slot { - None => { - assert!(!waiters.contains(&slot_idx)); - assert!(free.contains(&slot_idx)); - } - Some((None, None)) => { - assert!(waiters.contains(&slot_idx)); - assert!(!free.contains(&slot_idx)); - } - Some((Some(_), Some(_))) => { - assert!(!waiters.contains(&slot_idx)); - assert!(!free.contains(&slot_idx)); - } - Some((Some(_), None)) => { - assert!(waiters.contains(&slot_idx)); - assert!(!free.contains(&slot_idx)); - } - Some((None, Some(_))) => { - assert!(!waiters.contains(&slot_idx)); - assert!(!free.contains(&slot_idx)); - } - } - } - }; + let nostarve_position = self.find_victim_waiters.begin() + .expect("we initialize the nostarve queue to the same size as the slots semaphore, and the caller is presenting a permit"); - // Get in line. - let my_waitslot_idx = { - tracing::trace!("get in line locking waiters"); - let mut guard = self.find_victim_waiters.lock().unwrap(); - integrity_check(&guard); - let (waiters, free, waitslot) = &mut *guard; - let my_waitslot_idx = free - .pop_front() - .expect("can't happen, len(slots) = len(waiters"); - waiters.push_back(my_waitslot_idx); - let prev = waitslot[my_waitslot_idx].replace((None, None)); - debug_assert!(prev.is_none()); - integrity_check(&guard); - my_waitslot_idx - }; - - let span = tracing::info_span!("find_victim", my_waitslot_idx); + let span = tracing::info_span!("find_victim", ?nostarve_position); let _enter = span.enter(); let mut iters = 0; @@ -961,67 +899,7 @@ impl PageCache { } crate::metrics::PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL.inc_by(iters as u64); - // put in the victim we found - { - tracing::trace!("found victim locking waiters"); - let mut guard = self.find_victim_waiters.lock().unwrap(); - integrity_check(&guard); - let (waiters, _, waitslots) = &mut *guard; - let winner_idx = waiters.pop_front().expect("we put ourselves in"); - tracing::trace!(winner_idx, "putting victim into next waiters slot"); - let winner_slot = waitslots[winner_idx].as_mut().unwrap(); - let prev = winner_slot.1.replace((slot_idx, inner)); - debug_assert!( - prev.is_none(), - "ensure we didn't mess up this simple ring buffer structure" - ); - if let Some(waker) = winner_slot.0.take() { - tracing::trace!(winner_idx, "waking up winner"); - waker.wake() - } - integrity_check(&guard); - } - - let mut poll_num = 0; - let mut drop_guard = Some(scopeguard::guard((), |()| { - panic!("must not drop this future until Ready"); - })); - - // take the victim that was found by someone else - return Ok(poll_fn(move |cx| { - poll_num += 1; - tracing::trace!(poll_num, "poll_fn locking waiters"); - let mut guard = self.find_victim_waiters.lock().unwrap(); - integrity_check(&guard); - let (_, free, waitslots) = &mut *guard; - let my_waitslot = waitslots[my_waitslot_idx].as_mut().unwrap(); - // assert!( - // poll_num <= 2, - // "once we place the waker in the slot, next wakeup should have a result: {}", - // my_waitslot.1.is_some() - // ); - if let Some(res) = my_waitslot.1.take() { - tracing::trace!(poll_num, "have cache slot"); - // above .take() resets the waiters slot to None - free.push_back(my_waitslot_idx); - debug_assert!(my_waitslot.0.is_none()); - debug_assert!(my_waitslot.1.is_none()); - waitslots[my_waitslot_idx] = None; - let _ = scopeguard::ScopeGuard::into_inner(drop_guard.take().unwrap()); - integrity_check(&guard); - return Poll::Ready(res); - } - // assert_eq!(poll_num, 1); - if !my_waitslot.0.as_ref().map(|existing| cx.waker().will_wake(existing)).unwrap_or(false) { - let prev = my_waitslot.0.replace(cx.waker().clone()); - tracing::trace!(poll_num, prev_is_some=prev.is_some(), "updating waker"); - } - integrity_check(&guard); - tracing::trace!(poll_num, "waiting to be woken up"); - Poll::Pending - }) - .instrument(span.clone()) - .await); + return Ok(nostarve_position.complete_and_wait((slot_idx, inner)).await); } } } @@ -1065,11 +943,7 @@ impl PageCache { next_evict_slot: AtomicUsize::new(0), size_metrics, pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)), - find_victim_waiters: Mutex::new((VecDeque::new(), (0..num_pages).collect(), { - let mut v = Vec::with_capacity(num_pages); - v.resize_with(num_pages, || None); - v - })), + find_victim_waiters: ::nostarve_queue::Queue::new(num_pages), } } }