diff --git a/Cargo.lock b/Cargo.lock index d605169986..a664437bad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2610,6 +2610,17 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nostarve_queue" +version = "0.1.0" +dependencies = [ + "futures", + "rand 0.8.5", + "scopeguard", + "tokio", + "tracing", +] + [[package]] name = "notify" version = "5.2.0" @@ -2951,6 +2962,7 @@ dependencies = [ "itertools", "metrics", "nix 0.26.2", + "nostarve_queue", "num-traits", "num_cpus", "once_cell", diff --git a/Cargo.toml b/Cargo.toml index c50fb7be42..d89c51d3dd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ members = [ "libs/postgres_ffi/wal_craft", "libs/vm_monitor", "libs/walproposer", + "libs/nostarve_queue", ] [workspace.package] @@ -37,6 +38,7 @@ license = "Apache-2.0" [workspace.dependencies] anyhow = { version = "1.0", features = ["backtrace"] } arc-swap = "1.6" +async-channel = "1.9.0" async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] } azure_core = "0.16" azure_identity = "0.16" @@ -191,6 +193,7 @@ tracing-utils = { version = "0.1", path = "./libs/tracing-utils/" } utils = { version = "0.1", path = "./libs/utils/" } vm_monitor = { version = "0.1", path = "./libs/vm_monitor/" } walproposer = { version = "0.1", path = "./libs/walproposer/" } +nostarve_queue = { path = "./libs/nostarve_queue" } ## Common library dependency workspace_hack = { version = "0.1", path = "./workspace_hack/" } diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 237df48543..71ffb811ea 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -21,7 +21,7 @@ use pageserver_api::models::{ use pageserver_api::shard::TenantShardId; use postgres_backend::AuthType; use postgres_connection::{parse_host_port, PgConnectionConfig}; -use reqwest::blocking::{Client, RequestBuilder, Response}; +use reqwest::blocking::{Client, ClientBuilder, RequestBuilder, Response}; use reqwest::{IntoUrl, Method}; use thiserror::Error; use utils::auth::{Claims, Scope}; @@ -99,7 +99,7 @@ impl PageServerNode { pg_connection_config: PgConnectionConfig::new_host_port(host, port), conf: conf.clone(), env: env.clone(), - http_client: Client::new(), + http_client: ClientBuilder::new().timeout(None).build().unwrap(), http_base_url: format!("http://{}/v1", conf.listen_http_addr), } } diff --git a/libs/nostarve_queue/Cargo.toml b/libs/nostarve_queue/Cargo.toml new file mode 100644 index 0000000000..716762d477 --- /dev/null +++ b/libs/nostarve_queue/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "nostarve_queue" +version = "0.1.0" +edition.workspace = true +license.workspace = true + +[dependencies] +scopeguard.workspace = true +tracing.workspace = true + +[dev-dependencies] +futures.workspace = true +rand.workspace = true +tokio = { workspace = true, features = ["rt", "rt-multi-thread", "time"] } diff --git a/libs/nostarve_queue/src/lib.rs b/libs/nostarve_queue/src/lib.rs new file mode 100644 index 0000000000..57cd7f7b43 --- /dev/null +++ b/libs/nostarve_queue/src/lib.rs @@ -0,0 +1,316 @@ +//! Synchronization primitive to prevent starvation among concurrent tasks that do the same work. + +use std::{ + collections::VecDeque, + fmt, + future::poll_fn, + sync::Mutex, + task::{Poll, Waker}, +}; + +pub struct Queue { + inner: Mutex>, +} + +struct Inner { + waiters: VecDeque, + free: VecDeque, + slots: Vec, Option)>>, +} + +#[derive(Clone, Copy)] +pub struct Position<'q, T> { + idx: usize, + queue: &'q Queue, +} + +impl fmt::Debug for Position<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Position").field("idx", &self.idx).finish() + } +} + +impl Inner { + #[cfg(not(test))] + #[inline] + fn integrity_check(&self) {} + + #[cfg(test)] + fn integrity_check(&self) { + use std::collections::HashSet; + let waiters = self.waiters.iter().copied().collect::>(); + let free = self.free.iter().copied().collect::>(); + for (slot_idx, slot) in self.slots.iter().enumerate() { + match slot { + None => { + assert!(!waiters.contains(&slot_idx)); + assert!(free.contains(&slot_idx)); + } + Some((None, None)) => { + assert!(waiters.contains(&slot_idx)); + assert!(!free.contains(&slot_idx)); + } + Some((Some(_), Some(_))) => { + assert!(!waiters.contains(&slot_idx)); + assert!(!free.contains(&slot_idx)); + } + Some((Some(_), None)) => { + assert!(waiters.contains(&slot_idx)); + assert!(!free.contains(&slot_idx)); + } + Some((None, Some(_))) => { + assert!(!waiters.contains(&slot_idx)); + assert!(!free.contains(&slot_idx)); + } + } + } + } +} + +impl Queue { + pub fn new(size: usize) -> Self { + Queue { + inner: Mutex::new(Inner { + waiters: VecDeque::new(), + free: (0..size).collect(), + slots: { + let mut v = Vec::with_capacity(size); + v.resize_with(size, || None); + v + }, + }), + } + } + pub fn begin(&self) -> Result, ()> { + #[cfg(test)] + tracing::trace!("get in line locking inner"); + let mut inner = self.inner.lock().unwrap(); + inner.integrity_check(); + let my_waitslot_idx = inner + .free + .pop_front() + .expect("can't happen, len(slots) = len(waiters"); + inner.waiters.push_back(my_waitslot_idx); + let prev = inner.slots[my_waitslot_idx].replace((None, None)); + assert!(prev.is_none()); + inner.integrity_check(); + Ok(Position { + idx: my_waitslot_idx, + queue: &self, + }) + } +} + +impl<'q, T> Position<'q, T> { + pub fn complete_and_wait(self, datum: T) -> impl std::future::Future + 'q { + #[cfg(test)] + tracing::trace!("found victim locking waiters"); + let mut inner = self.queue.inner.lock().unwrap(); + inner.integrity_check(); + let winner_idx = inner.waiters.pop_front().expect("we put ourselves in"); + #[cfg(test)] + tracing::trace!(winner_idx, "putting victim into next waiters slot"); + let winner_slot = inner.slots[winner_idx].as_mut().unwrap(); + let prev = winner_slot.1.replace(datum); + assert!( + prev.is_none(), + "ensure we didn't mess up this simple ring buffer structure" + ); + if let Some(waker) = winner_slot.0.take() { + #[cfg(test)] + tracing::trace!(winner_idx, "waking up winner"); + waker.wake() + } + inner.integrity_check(); + drop(inner); // the poll_fn locks it again + + let mut poll_num = 0; + let mut drop_guard = Some(scopeguard::guard((), |()| { + panic!("must not drop this future until Ready"); + })); + + // take the victim that was found by someone else + poll_fn(move |cx| { + let my_waitslot_idx = self.idx; + poll_num += 1; + #[cfg(test)] + tracing::trace!(poll_num, "poll_fn locking waiters"); + let mut inner = self.queue.inner.lock().unwrap(); + inner.integrity_check(); + let my_waitslot = inner.slots[self.idx].as_mut().unwrap(); + // assert!( + // poll_num <= 2, + // "once we place the waker in the slot, next wakeup should have a result: {}", + // my_waitslot.1.is_some() + // ); + if let Some(res) = my_waitslot.1.take() { + #[cfg(test)] + tracing::trace!(poll_num, "have cache slot"); + // above .take() resets the waiters slot to None + debug_assert!(my_waitslot.0.is_none()); + debug_assert!(my_waitslot.1.is_none()); + inner.slots[my_waitslot_idx] = None; + inner.free.push_back(my_waitslot_idx); + let _ = scopeguard::ScopeGuard::into_inner(drop_guard.take().unwrap()); + inner.integrity_check(); + return Poll::Ready(res); + } + // assert_eq!(poll_num, 1); + if !my_waitslot + .0 + .as_ref() + .map(|existing| cx.waker().will_wake(existing)) + .unwrap_or(false) + { + let prev = my_waitslot.0.replace(cx.waker().clone()); + #[cfg(test)] + tracing::trace!(poll_num, prev_is_some = prev.is_some(), "updating waker"); + } + inner.integrity_check(); + #[cfg(test)] + tracing::trace!(poll_num, "waiting to be woken up"); + Poll::Pending + }) + } +} + +#[cfg(test)] +mod test { + use std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + task::Poll, + time::Duration, + }; + + use rand::RngCore; + + #[tokio::test] + async fn in_order_completion_and_wait() { + let queue = super::Queue::new(2); + + let q1 = queue.begin().unwrap(); + let q2 = queue.begin().unwrap(); + + assert_eq!(q1.complete_and_wait(23).await, 23); + assert_eq!(q2.complete_and_wait(42).await, 42); + } + + #[tokio::test] + async fn out_of_order_completion_and_wait() { + let queue = super::Queue::new(2); + + let q1 = queue.begin().unwrap(); + let q2 = queue.begin().unwrap(); + + let mut q2compfut = q2.complete_and_wait(23); + + match futures::poll!(&mut q2compfut) { + Poll::Pending => {} + Poll::Ready(_) => panic!("should not be ready yet, it's queued after q1"), + } + + let q1res = q1.complete_and_wait(42).await; + assert_eq!(q1res, 23); + + let q2res = q2compfut.await; + assert_eq!(q2res, 42); + } + + #[tokio::test] + async fn in_order_completion_out_of_order_wait() { + let queue = super::Queue::new(2); + + let q1 = queue.begin().unwrap(); + let q2 = queue.begin().unwrap(); + + let mut q1compfut = q1.complete_and_wait(23); + + let mut q2compfut = q2.complete_and_wait(42); + + match futures::poll!(&mut q2compfut) { + Poll::Pending => { + unreachable!("q2 should be ready, it wasn't first but q1 is serviced already") + } + Poll::Ready(x) => assert_eq!(x, 42), + } + + assert_eq!(futures::poll!(&mut q1compfut), Poll::Ready(23)); + } + + #[tokio::test(flavor = "multi_thread")] + async fn stress() { + let ntasks = 8; + let queue_size = 8; + let queue = Arc::new(super::Queue::new(queue_size)); + + let stop = Arc::new(AtomicBool::new(false)); + + let mut tasks = vec![]; + for i in 0..ntasks { + let jh = tokio::spawn({ + let queue = Arc::clone(&queue); + let stop = Arc::clone(&stop); + async move { + while !stop.load(Ordering::Relaxed) { + let q = queue.begin().unwrap(); + for _ in 0..(rand::thread_rng().next_u32() % 10_000) { + std::hint::spin_loop(); + } + q.complete_and_wait(i).await; + tokio::task::yield_now().await; + } + } + }); + tasks.push(jh); + } + + tokio::time::sleep(Duration::from_secs(10)).await; + + stop.store(true, Ordering::Relaxed); + + for t in tasks { + t.await.unwrap(); + } + } + + #[test] + fn stress_two_runtimes_shared_queue() { + std::thread::scope(|s| { + let ntasks = 8; + let queue_size = 8; + let queue = Arc::new(super::Queue::new(queue_size)); + + let stop = Arc::new(AtomicBool::new(false)); + + for i in 0..ntasks { + s.spawn({ + let queue = Arc::clone(&queue); + let stop = Arc::clone(&stop); + move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async move { + while !stop.load(Ordering::Relaxed) { + let q = queue.begin().unwrap(); + for _ in 0..(rand::thread_rng().next_u32() % 10_000) { + std::hint::spin_loop(); + } + q.complete_and_wait(i).await; + tokio::task::yield_now().await; + } + }); + } + }); + } + + std::thread::sleep(Duration::from_secs(10)); + + stop.store(true, Ordering::Relaxed); + }); + } +} diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 3eb01003df..a40bd133fc 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -37,6 +37,7 @@ humantime-serde.workspace = true hyper.workspace = true itertools.workspace = true nix.workspace = true +nostarve_queue.workspace = true # hack to get the number of worker threads tokio uses num_cpus = { version = "1.15" } num-traits.workspace = true diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index d5915f4c98..d849ed5363 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -314,7 +314,6 @@ static PAGE_CACHE_ERRORS: Lazy = Lazy::new(|| { #[strum(serialize_all = "kebab_case")] pub(crate) enum PageCacheErrorKind { AcquirePinnedSlotTimeout, - EvictIterLimit, } pub(crate) fn page_cache_errors_inc(error_kind: PageCacheErrorKind) { diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 0702057766..4ead0fefb5 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -83,6 +83,7 @@ use std::{ use anyhow::Context; use once_cell::sync::OnceCell; +use tracing::instrument; use utils::{ id::{TenantId, TimelineId}, lsn::Lsn, @@ -252,6 +253,9 @@ pub struct PageCache { next_evict_slot: AtomicUsize, size_metrics: &'static PageCacheSizeMetrics, + + find_victim_waiters: + nostarve_queue::Queue<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>, } struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit); @@ -430,8 +434,9 @@ impl PageCache { /// /// Store an image of the given page in the cache. /// + #[cfg_attr(test, instrument(skip_all, level = "trace", fields(%key, %lsn)))] pub async fn memorize_materialized_page( - &self, + &'static self, tenant_id: TenantId, timeline_id: TimelineId, key: Key, @@ -522,8 +527,9 @@ impl PageCache { // Section 1.2: Public interface functions for working with immutable file pages. + #[cfg_attr(test, instrument(skip_all, level = "trace", fields(?file_id, ?blkno)))] pub async fn read_immutable_buf( - &self, + &'static self, file_id: FileId, blkno: u32, ctx: &RequestContext, @@ -629,7 +635,7 @@ impl PageCache { /// ``` /// async fn lock_for_read( - &self, + &'static self, cache_key: &mut CacheKey, ctx: &RequestContext, ) -> anyhow::Result { @@ -851,10 +857,15 @@ impl PageCache { /// /// On return, the slot is empty and write-locked. async fn find_victim( - &self, + &'static self, _permit_witness: &PinnedSlotsPermit, ) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard)> { - let iter_limit = self.slots.len() * 10; + let nostarve_position = self.find_victim_waiters.begin() + .expect("we initialize the nostarve queue to the same size as the slots semaphore, and the caller is presenting a permit"); + + let span = tracing::info_span!("find_victim", ?nostarve_position); + let _enter = span.enter(); + let mut iters = 0; loop { iters += 1; @@ -866,41 +877,8 @@ impl PageCache { let mut inner = match slot.inner.try_write() { Ok(inner) => inner, Err(_err) => { - if iters > iter_limit { - // NB: Even with the permits, there's no hard guarantee that we will find a slot with - // any particular number of iterations: other threads might race ahead and acquire and - // release pins just as we're scanning the array. - // - // Imagine that nslots is 2, and as starting point, usage_count==1 on all - // slots. There are two threads running concurrently, A and B. A has just - // acquired the permit from the semaphore. - // - // A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search - // B: Acquire permit. - // B: Look at slot 2, decrement its usage_count to zero and continue the search - // B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1. - // B: Release pin and permit again - // B: Acquire permit. - // B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1. - // B: Release pin and permit again - // - // Now we're back in the starting situation that both slots have - // usage_count 1, but A has now been through one iteration of the - // find_victim() loop. This can repeat indefinitely and on each - // iteration, A's iteration count increases by one. - // - // So, even though the semaphore for the permits is fair, the victim search - // itself happens in parallel and is not fair. - // Hence even with a permit, a task can theoretically be starved. - // To avoid this, we'd need tokio to give priority to tasks that are holding - // permits for longer. - // Note that just yielding to tokio during iteration without such - // priority boosting is likely counter-productive. We'd just give more opportunities - // for B to bump usage count, further starving A. - crate::metrics::page_cache_errors_inc( - crate::metrics::PageCacheErrorKind::EvictIterLimit, - ); - anyhow::bail!("exceeded evict iter limit"); + if iters > self.slots.len() * (MAX_USAGE_COUNT as usize) { + unreachable!("find_victim_waiters prevents starvation"); } continue; } @@ -911,7 +889,8 @@ impl PageCache { inner.key = None; } crate::metrics::PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL.inc_by(iters as u64); - return Ok((slot_idx, inner)); + + return Ok(nostarve_position.complete_and_wait((slot_idx, inner)).await); } } } @@ -955,6 +934,7 @@ impl PageCache { next_evict_slot: AtomicUsize::new(0), size_metrics, pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)), + find_victim_waiters: ::nostarve_queue::Queue::new(num_pages), } } }