Pull in squash of page_cache: find_victim: prevent starvation #5483

Squashed commit of the following:

commit 71bf9cf8ae
Author: Christian Schwarz <me@cschwarz.com>
Date:   Mon Oct 9 20:21:22 2023 +0000

    origin/problame/page-cache-forward-progress/3: trace spans and events only for tests

commit fd97c98dd9
Author: Christian Schwarz <christian@neon.tech>
Date:   Mon Oct 9 21:02:27 2023 +0200

    move into library

commit 05dbff7a18
Author: Christian Schwarz <christian@neon.tech>
Date:   Mon Oct 9 19:26:47 2023 +0200

    commented out the check for just-once-polled, works now, don't understand why though

commit 31632502aa
Author: Christian Schwarz <christian@neon.tech>
Date:   Mon Oct 9 17:54:44 2023 +0200

    fixes

commit 76d3e44588
Author: Christian Schwarz <christian@neon.tech>
Date:   Fri Oct 6 14:39:40 2023 +0200

    hand-roll it instead

commit a5912dcc1b
Author: Christian Schwarz <me@cschwarz.com>
Date:   Wed Oct 4 17:21:24 2023 +0000

    page_cache: find_victim: prevent starvation

commit da9a88a882
Author: Christian Schwarz <me@cschwarz.com>
Date:   Mon Oct 2 17:39:35 2023 +0000

    page_cache: ensure forward progress on cache miss
This commit is contained in:
Christian Schwarz
2023-11-29 11:51:20 +00:00
parent 2eb9f64978
commit f911050e31
8 changed files with 369 additions and 44 deletions

12
Cargo.lock generated
View File

@@ -2610,6 +2610,17 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "nostarve_queue"
version = "0.1.0"
dependencies = [
"futures",
"rand 0.8.5",
"scopeguard",
"tokio",
"tracing",
]
[[package]]
name = "notify"
version = "5.2.0"
@@ -2951,6 +2962,7 @@ dependencies = [
"itertools",
"metrics",
"nix 0.26.2",
"nostarve_queue",
"num-traits",
"num_cpus",
"once_cell",

View File

@@ -27,6 +27,7 @@ members = [
"libs/postgres_ffi/wal_craft",
"libs/vm_monitor",
"libs/walproposer",
"libs/nostarve_queue",
]
[workspace.package]
@@ -37,6 +38,7 @@ license = "Apache-2.0"
[workspace.dependencies]
anyhow = { version = "1.0", features = ["backtrace"] }
arc-swap = "1.6"
async-channel = "1.9.0"
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
azure_core = "0.16"
azure_identity = "0.16"
@@ -191,6 +193,7 @@ tracing-utils = { version = "0.1", path = "./libs/tracing-utils/" }
utils = { version = "0.1", path = "./libs/utils/" }
vm_monitor = { version = "0.1", path = "./libs/vm_monitor/" }
walproposer = { version = "0.1", path = "./libs/walproposer/" }
nostarve_queue = { path = "./libs/nostarve_queue" }
## Common library dependency
workspace_hack = { version = "0.1", path = "./workspace_hack/" }

View File

@@ -21,7 +21,7 @@ use pageserver_api::models::{
use pageserver_api::shard::TenantShardId;
use postgres_backend::AuthType;
use postgres_connection::{parse_host_port, PgConnectionConfig};
use reqwest::blocking::{Client, RequestBuilder, Response};
use reqwest::blocking::{Client, ClientBuilder, RequestBuilder, Response};
use reqwest::{IntoUrl, Method};
use thiserror::Error;
use utils::auth::{Claims, Scope};
@@ -99,7 +99,7 @@ impl PageServerNode {
pg_connection_config: PgConnectionConfig::new_host_port(host, port),
conf: conf.clone(),
env: env.clone(),
http_client: Client::new(),
http_client: ClientBuilder::new().timeout(None).build().unwrap(),
http_base_url: format!("http://{}/v1", conf.listen_http_addr),
}
}

View File

@@ -0,0 +1,14 @@
[package]
name = "nostarve_queue"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
scopeguard.workspace = true
tracing.workspace = true
[dev-dependencies]
futures.workspace = true
rand.workspace = true
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "time"] }

View File

@@ -0,0 +1,316 @@
//! Synchronization primitive to prevent starvation among concurrent tasks that do the same work.
use std::{
collections::VecDeque,
fmt,
future::poll_fn,
sync::Mutex,
task::{Poll, Waker},
};
pub struct Queue<T> {
inner: Mutex<Inner<T>>,
}
struct Inner<T> {
waiters: VecDeque<usize>,
free: VecDeque<usize>,
slots: Vec<Option<(Option<Waker>, Option<T>)>>,
}
#[derive(Clone, Copy)]
pub struct Position<'q, T> {
idx: usize,
queue: &'q Queue<T>,
}
impl<T> fmt::Debug for Position<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Position").field("idx", &self.idx).finish()
}
}
impl<T> Inner<T> {
#[cfg(not(test))]
#[inline]
fn integrity_check(&self) {}
#[cfg(test)]
fn integrity_check(&self) {
use std::collections::HashSet;
let waiters = self.waiters.iter().copied().collect::<HashSet<_>>();
let free = self.free.iter().copied().collect::<HashSet<_>>();
for (slot_idx, slot) in self.slots.iter().enumerate() {
match slot {
None => {
assert!(!waiters.contains(&slot_idx));
assert!(free.contains(&slot_idx));
}
Some((None, None)) => {
assert!(waiters.contains(&slot_idx));
assert!(!free.contains(&slot_idx));
}
Some((Some(_), Some(_))) => {
assert!(!waiters.contains(&slot_idx));
assert!(!free.contains(&slot_idx));
}
Some((Some(_), None)) => {
assert!(waiters.contains(&slot_idx));
assert!(!free.contains(&slot_idx));
}
Some((None, Some(_))) => {
assert!(!waiters.contains(&slot_idx));
assert!(!free.contains(&slot_idx));
}
}
}
}
}
impl<T> Queue<T> {
pub fn new(size: usize) -> Self {
Queue {
inner: Mutex::new(Inner {
waiters: VecDeque::new(),
free: (0..size).collect(),
slots: {
let mut v = Vec::with_capacity(size);
v.resize_with(size, || None);
v
},
}),
}
}
pub fn begin(&self) -> Result<Position<T>, ()> {
#[cfg(test)]
tracing::trace!("get in line locking inner");
let mut inner = self.inner.lock().unwrap();
inner.integrity_check();
let my_waitslot_idx = inner
.free
.pop_front()
.expect("can't happen, len(slots) = len(waiters");
inner.waiters.push_back(my_waitslot_idx);
let prev = inner.slots[my_waitslot_idx].replace((None, None));
assert!(prev.is_none());
inner.integrity_check();
Ok(Position {
idx: my_waitslot_idx,
queue: &self,
})
}
}
impl<'q, T> Position<'q, T> {
pub fn complete_and_wait(self, datum: T) -> impl std::future::Future<Output = T> + 'q {
#[cfg(test)]
tracing::trace!("found victim locking waiters");
let mut inner = self.queue.inner.lock().unwrap();
inner.integrity_check();
let winner_idx = inner.waiters.pop_front().expect("we put ourselves in");
#[cfg(test)]
tracing::trace!(winner_idx, "putting victim into next waiters slot");
let winner_slot = inner.slots[winner_idx].as_mut().unwrap();
let prev = winner_slot.1.replace(datum);
assert!(
prev.is_none(),
"ensure we didn't mess up this simple ring buffer structure"
);
if let Some(waker) = winner_slot.0.take() {
#[cfg(test)]
tracing::trace!(winner_idx, "waking up winner");
waker.wake()
}
inner.integrity_check();
drop(inner); // the poll_fn locks it again
let mut poll_num = 0;
let mut drop_guard = Some(scopeguard::guard((), |()| {
panic!("must not drop this future until Ready");
}));
// take the victim that was found by someone else
poll_fn(move |cx| {
let my_waitslot_idx = self.idx;
poll_num += 1;
#[cfg(test)]
tracing::trace!(poll_num, "poll_fn locking waiters");
let mut inner = self.queue.inner.lock().unwrap();
inner.integrity_check();
let my_waitslot = inner.slots[self.idx].as_mut().unwrap();
// assert!(
// poll_num <= 2,
// "once we place the waker in the slot, next wakeup should have a result: {}",
// my_waitslot.1.is_some()
// );
if let Some(res) = my_waitslot.1.take() {
#[cfg(test)]
tracing::trace!(poll_num, "have cache slot");
// above .take() resets the waiters slot to None
debug_assert!(my_waitslot.0.is_none());
debug_assert!(my_waitslot.1.is_none());
inner.slots[my_waitslot_idx] = None;
inner.free.push_back(my_waitslot_idx);
let _ = scopeguard::ScopeGuard::into_inner(drop_guard.take().unwrap());
inner.integrity_check();
return Poll::Ready(res);
}
// assert_eq!(poll_num, 1);
if !my_waitslot
.0
.as_ref()
.map(|existing| cx.waker().will_wake(existing))
.unwrap_or(false)
{
let prev = my_waitslot.0.replace(cx.waker().clone());
#[cfg(test)]
tracing::trace!(poll_num, prev_is_some = prev.is_some(), "updating waker");
}
inner.integrity_check();
#[cfg(test)]
tracing::trace!(poll_num, "waiting to be woken up");
Poll::Pending
})
}
}
#[cfg(test)]
mod test {
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
task::Poll,
time::Duration,
};
use rand::RngCore;
#[tokio::test]
async fn in_order_completion_and_wait() {
let queue = super::Queue::new(2);
let q1 = queue.begin().unwrap();
let q2 = queue.begin().unwrap();
assert_eq!(q1.complete_and_wait(23).await, 23);
assert_eq!(q2.complete_and_wait(42).await, 42);
}
#[tokio::test]
async fn out_of_order_completion_and_wait() {
let queue = super::Queue::new(2);
let q1 = queue.begin().unwrap();
let q2 = queue.begin().unwrap();
let mut q2compfut = q2.complete_and_wait(23);
match futures::poll!(&mut q2compfut) {
Poll::Pending => {}
Poll::Ready(_) => panic!("should not be ready yet, it's queued after q1"),
}
let q1res = q1.complete_and_wait(42).await;
assert_eq!(q1res, 23);
let q2res = q2compfut.await;
assert_eq!(q2res, 42);
}
#[tokio::test]
async fn in_order_completion_out_of_order_wait() {
let queue = super::Queue::new(2);
let q1 = queue.begin().unwrap();
let q2 = queue.begin().unwrap();
let mut q1compfut = q1.complete_and_wait(23);
let mut q2compfut = q2.complete_and_wait(42);
match futures::poll!(&mut q2compfut) {
Poll::Pending => {
unreachable!("q2 should be ready, it wasn't first but q1 is serviced already")
}
Poll::Ready(x) => assert_eq!(x, 42),
}
assert_eq!(futures::poll!(&mut q1compfut), Poll::Ready(23));
}
#[tokio::test(flavor = "multi_thread")]
async fn stress() {
let ntasks = 8;
let queue_size = 8;
let queue = Arc::new(super::Queue::new(queue_size));
let stop = Arc::new(AtomicBool::new(false));
let mut tasks = vec![];
for i in 0..ntasks {
let jh = tokio::spawn({
let queue = Arc::clone(&queue);
let stop = Arc::clone(&stop);
async move {
while !stop.load(Ordering::Relaxed) {
let q = queue.begin().unwrap();
for _ in 0..(rand::thread_rng().next_u32() % 10_000) {
std::hint::spin_loop();
}
q.complete_and_wait(i).await;
tokio::task::yield_now().await;
}
}
});
tasks.push(jh);
}
tokio::time::sleep(Duration::from_secs(10)).await;
stop.store(true, Ordering::Relaxed);
for t in tasks {
t.await.unwrap();
}
}
#[test]
fn stress_two_runtimes_shared_queue() {
std::thread::scope(|s| {
let ntasks = 8;
let queue_size = 8;
let queue = Arc::new(super::Queue::new(queue_size));
let stop = Arc::new(AtomicBool::new(false));
for i in 0..ntasks {
s.spawn({
let queue = Arc::clone(&queue);
let stop = Arc::clone(&stop);
move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async move {
while !stop.load(Ordering::Relaxed) {
let q = queue.begin().unwrap();
for _ in 0..(rand::thread_rng().next_u32() % 10_000) {
std::hint::spin_loop();
}
q.complete_and_wait(i).await;
tokio::task::yield_now().await;
}
});
}
});
}
std::thread::sleep(Duration::from_secs(10));
stop.store(true, Ordering::Relaxed);
});
}
}

View File

@@ -37,6 +37,7 @@ humantime-serde.workspace = true
hyper.workspace = true
itertools.workspace = true
nix.workspace = true
nostarve_queue.workspace = true
# hack to get the number of worker threads tokio uses
num_cpus = { version = "1.15" }
num-traits.workspace = true

View File

@@ -314,7 +314,6 @@ static PAGE_CACHE_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
#[strum(serialize_all = "kebab_case")]
pub(crate) enum PageCacheErrorKind {
AcquirePinnedSlotTimeout,
EvictIterLimit,
}
pub(crate) fn page_cache_errors_inc(error_kind: PageCacheErrorKind) {

View File

@@ -83,6 +83,7 @@ use std::{
use anyhow::Context;
use once_cell::sync::OnceCell;
use tracing::instrument;
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
@@ -252,6 +253,9 @@ pub struct PageCache {
next_evict_slot: AtomicUsize,
size_metrics: &'static PageCacheSizeMetrics,
find_victim_waiters:
nostarve_queue::Queue<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>,
}
struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);
@@ -430,8 +434,9 @@ impl PageCache {
///
/// Store an image of the given page in the cache.
///
#[cfg_attr(test, instrument(skip_all, level = "trace", fields(%key, %lsn)))]
pub async fn memorize_materialized_page(
&self,
&'static self,
tenant_id: TenantId,
timeline_id: TimelineId,
key: Key,
@@ -522,8 +527,9 @@ impl PageCache {
// Section 1.2: Public interface functions for working with immutable file pages.
#[cfg_attr(test, instrument(skip_all, level = "trace", fields(?file_id, ?blkno)))]
pub async fn read_immutable_buf(
&self,
&'static self,
file_id: FileId,
blkno: u32,
ctx: &RequestContext,
@@ -629,7 +635,7 @@ impl PageCache {
/// ```
///
async fn lock_for_read(
&self,
&'static self,
cache_key: &mut CacheKey,
ctx: &RequestContext,
) -> anyhow::Result<ReadBufResult> {
@@ -851,10 +857,15 @@ impl PageCache {
///
/// On return, the slot is empty and write-locked.
async fn find_victim(
&self,
&'static self,
_permit_witness: &PinnedSlotsPermit,
) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
let iter_limit = self.slots.len() * 10;
let nostarve_position = self.find_victim_waiters.begin()
.expect("we initialize the nostarve queue to the same size as the slots semaphore, and the caller is presenting a permit");
let span = tracing::info_span!("find_victim", ?nostarve_position);
let _enter = span.enter();
let mut iters = 0;
loop {
iters += 1;
@@ -866,41 +877,8 @@ impl PageCache {
let mut inner = match slot.inner.try_write() {
Ok(inner) => inner,
Err(_err) => {
if iters > iter_limit {
// NB: Even with the permits, there's no hard guarantee that we will find a slot with
// any particular number of iterations: other threads might race ahead and acquire and
// release pins just as we're scanning the array.
//
// Imagine that nslots is 2, and as starting point, usage_count==1 on all
// slots. There are two threads running concurrently, A and B. A has just
// acquired the permit from the semaphore.
//
// A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search
// B: Acquire permit.
// B: Look at slot 2, decrement its usage_count to zero and continue the search
// B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1.
// B: Release pin and permit again
// B: Acquire permit.
// B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1.
// B: Release pin and permit again
//
// Now we're back in the starting situation that both slots have
// usage_count 1, but A has now been through one iteration of the
// find_victim() loop. This can repeat indefinitely and on each
// iteration, A's iteration count increases by one.
//
// So, even though the semaphore for the permits is fair, the victim search
// itself happens in parallel and is not fair.
// Hence even with a permit, a task can theoretically be starved.
// To avoid this, we'd need tokio to give priority to tasks that are holding
// permits for longer.
// Note that just yielding to tokio during iteration without such
// priority boosting is likely counter-productive. We'd just give more opportunities
// for B to bump usage count, further starving A.
crate::metrics::page_cache_errors_inc(
crate::metrics::PageCacheErrorKind::EvictIterLimit,
);
anyhow::bail!("exceeded evict iter limit");
if iters > self.slots.len() * (MAX_USAGE_COUNT as usize) {
unreachable!("find_victim_waiters prevents starvation");
}
continue;
}
@@ -911,7 +889,8 @@ impl PageCache {
inner.key = None;
}
crate::metrics::PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL.inc_by(iters as u64);
return Ok((slot_idx, inner));
return Ok(nostarve_position.complete_and_wait((slot_idx, inner)).await);
}
}
}
@@ -955,6 +934,7 @@ impl PageCache {
next_evict_slot: AtomicUsize::new(0),
size_metrics,
pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
find_victim_waiters: ::nostarve_queue::Queue::new(num_pages),
}
}
}