From 2f27412d7d4a19dd39d5fc2ecc2b0395b2655bd7 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 4 Oct 2023 17:21:24 +0000 Subject: [PATCH] starvation prevention v2 while allowing concurrent find_victims --- Cargo.lock | 27 ++++++++++++++ pageserver/Cargo.toml | 1 + pageserver/src/page_cache.rs | 69 ++++++++++++++---------------------- 3 files changed, 55 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 36e7069eb1..3e85fe2f9a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -158,6 +158,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + [[package]] name = "async-compression" version = "0.4.0" @@ -1015,6 +1026,15 @@ dependencies = [ "zstd", ] +[[package]] +name = "concurrent-queue" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f057a694a54f12365049b0958a1685bb52d567f5593b355fbf685838e873d400" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "const_format" version = "0.2.30" @@ -1435,6 +1455,12 @@ dependencies = [ "libc", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + [[package]] name = "fail" version = "0.5.1" @@ -2656,6 +2682,7 @@ name = "pageserver" version = "0.1.0" dependencies = [ "anyhow", + "async-channel", "async-compression", "async-stream", "async-trait", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 9cb71dea09..6704171377 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -81,6 +81,7 @@ enumset.workspace = true strum.workspace = true strum_macros.workspace = true tempfile.workspace = true +async-channel = "1.9.0" [dev-dependencies] criterion.workspace = true diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 0f97dc7443..ec20f4478b 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -79,6 +79,7 @@ use std::{ atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering}, Arc, Weak, }, + task::Poll, time::Duration, }; @@ -252,6 +253,11 @@ pub struct PageCache { /// This is interpreted modulo the page cache size. next_evict_slot: AtomicUsize, + find_victim_sender: + async_channel::Sender<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>, + find_victim_waiters: + async_channel::Receiver<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>, + size_metrics: &'static PageCacheSizeMetrics, } @@ -449,7 +455,7 @@ impl PageCache { /// Store an image of the given page in the cache. /// pub async fn memorize_materialized_page( - &self, + &'static self, tenant_id: TenantId, timeline_id: TimelineId, key: Key, @@ -484,7 +490,7 @@ impl PageCache { // Section 1.2: Public interface functions for working with immutable file pages. pub async fn read_immutable_buf( - &self, + &'static self, file_id: FileId, blkno: u32, ctx: &RequestContext, @@ -590,7 +596,7 @@ impl PageCache { /// ``` /// async fn lock_for_read( - &self, + &'static self, cache_key: &mut CacheKey, ctx: &RequestContext, ) -> anyhow::Result { @@ -692,7 +698,7 @@ impl PageCache { /// /// Similar to lock_for_read(), but the returned buffer is write-locked and /// may be modified by the caller even if it's already found in the cache. - async fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result { + async fn lock_for_write(&'static self, cache_key: &CacheKey) -> anyhow::Result { let mut permit = Some(self.try_get_pinned_slot_permit().await?); loop { // First check if the key already exists in the cache. @@ -892,10 +898,12 @@ impl PageCache { /// /// On return, the slot is empty and write-locked. async fn find_victim( - &self, + &'static self, _permit_witness: &PinnedSlotsPermit, ) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard)> { - let iter_limit = self.slots.len() * 10; + // Get in line. + let receiver = self.find_victim_waiters.recv(); + let mut iters = 0; loop { iters += 1; @@ -907,41 +915,8 @@ impl PageCache { let mut inner = match slot.inner.try_write() { Ok(inner) => inner, Err(_err) => { - if iters > iter_limit { - // NB: Even with the permits, there's no hard guarantee that we will find a slot with - // any particular number of iterations: other threads might race ahead and acquire and - // release pins just as we're scanning the array. - // - // Imagine that nslots is 2, and as starting point, usage_count==1 on all - // slots. There are two threads running concurrently, A and B. A has just - // acquired the permit from the semaphore. - // - // A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search - // B: Acquire permit. - // B: Look at slot 2, decrement its usage_count to zero and continue the search - // B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1. - // B: Release pin and permit again - // B: Acquire permit. - // B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1. - // B: Release pin and permit again - // - // Now we're back in the starting situation that both slots have - // usage_count 1, but A has now been through one iteration of the - // find_victim() loop. This can repeat indefinitely and on each - // iteration, A's iteration count increases by one. - // - // So, even though the semaphore for the permits is fair, the victim search - // itself happens in parallel and is not fair. - // Hence even with a permit, a task can theoretically be starved. - // To avoid this, we'd need tokio to give priority to tasks that are holding - // permits for longer. - // Note that just yielding to tokio during iteration without such - // priority boosting is likely counter-productive. We'd just give more opportunities - // for B to bump usage count, further starving A. - crate::metrics::page_cache_errors_inc( - crate::metrics::PageCacheErrorKind::EvictIterLimit, - ); - anyhow::bail!("exceeded evict iter limit"); + if iters > self.slots.len() * (MAX_USAGE_COUNT as usize) { + unreachable!("find_victim_waiters prevents starvation"); } continue; } @@ -952,7 +927,14 @@ impl PageCache { inner.key = None; } crate::metrics::PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL.inc_by(iters as u64); - return Ok((slot_idx, inner)); + self.find_victim_sender + .try_send((slot_idx, inner)) + .expect("we always get in line first"); + match futures::poll!(receiver) { + Poll::Ready(Ok(res)) => return Ok(res), + Poll::Ready(Err(_closed)) => unreachable!("we never close"), + Poll::Pending => unreachable!("we just sent to the channel and got in line earlier"), + } } } } @@ -989,6 +971,7 @@ impl PageCache { }) .collect(); + let (find_victim_sender, find_victim_waiters) = async_channel::bounded(num_pages); Self { materialized_page_map: Default::default(), immutable_page_map: Default::default(), @@ -996,6 +979,8 @@ impl PageCache { next_evict_slot: AtomicUsize::new(0), size_metrics, pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)), + find_victim_sender, + find_victim_waiters, } } }