diff --git a/Cargo.lock b/Cargo.lock index be3f179d5f..d313f4b611 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -158,6 +158,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + [[package]] name = "async-compression" version = "0.4.0" @@ -1031,6 +1042,15 @@ dependencies = [ "zstd", ] +[[package]] +name = "concurrent-queue" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f057a694a54f12365049b0958a1685bb52d567f5593b355fbf685838e873d400" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "const_format" version = "0.2.30" @@ -1452,6 +1472,12 @@ dependencies = [ "libc", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + [[package]] name = "fail" version = "0.5.1" @@ -2674,6 +2700,7 @@ name = "pageserver" version = "0.1.0" dependencies = [ "anyhow", + "async-channel", "async-compression", "async-stream", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 2b9da977e5..6a2ad5e909 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ license = "Apache-2.0" ## All dependency versions, used in the project [workspace.dependencies] anyhow = { version = "1.0", features = ["backtrace"] } +async-channel = "1.9.0" async-compression = { version = "0.4.0", features = ["tokio", "gzip"] } flate2 = "1.0.26" async-stream = "0.3" diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 3eb01003df..021e43984e 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -12,6 +12,7 @@ testing = ["fail/failpoints"] [dependencies] anyhow.workspace = true +async-channel.workspace = true async-compression.workspace = true async-stream.workspace = true async-trait.workspace = true diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 92ce654b7e..1938f1b154 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -314,7 +314,6 @@ static PAGE_CACHE_ERRORS: Lazy = Lazy::new(|| { #[strum(serialize_all = "kebab_case")] pub(crate) enum PageCacheErrorKind { AcquirePinnedSlotTimeout, - EvictIterLimit, } pub(crate) fn page_cache_errors_inc(error_kind: PageCacheErrorKind) { diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index d7ac1ec336..95e26cb0cb 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -78,6 +78,7 @@ use std::{ atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering}, Arc, Weak, }, + task::Poll, time::Duration, }; @@ -251,6 +252,11 @@ pub struct PageCache { /// This is interpreted modulo the page cache size. next_evict_slot: AtomicUsize, + find_victim_sender: + async_channel::Sender<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>, + find_victim_waiters: + async_channel::Receiver<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>, + size_metrics: &'static PageCacheSizeMetrics, } @@ -440,7 +446,7 @@ impl PageCache { /// Store an image of the given page in the cache. /// pub async fn memorize_materialized_page( - &self, + &'static self, tenant_id: TenantId, timeline_id: TimelineId, key: Key, @@ -532,7 +538,7 @@ impl PageCache { // Section 1.2: Public interface functions for working with immutable file pages. pub async fn read_immutable_buf( - &self, + &'static self, file_id: FileId, blkno: u32, ctx: &RequestContext, @@ -638,7 +644,7 @@ impl PageCache { /// ``` /// async fn lock_for_read( - &self, + &'static self, cache_key: &mut CacheKey, ctx: &RequestContext, ) -> anyhow::Result { @@ -860,10 +866,12 @@ impl PageCache { /// /// On return, the slot is empty and write-locked. async fn find_victim( - &self, + &'static self, _permit_witness: &PinnedSlotsPermit, ) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard)> { - let iter_limit = self.slots.len() * 10; + // Get in line. + let receiver = self.find_victim_waiters.recv(); + let mut iters = 0; loop { iters += 1; @@ -875,41 +883,8 @@ impl PageCache { let mut inner = match slot.inner.try_write() { Ok(inner) => inner, Err(_err) => { - if iters > iter_limit { - // NB: Even with the permits, there's no hard guarantee that we will find a slot with - // any particular number of iterations: other threads might race ahead and acquire and - // release pins just as we're scanning the array. - // - // Imagine that nslots is 2, and as starting point, usage_count==1 on all - // slots. There are two threads running concurrently, A and B. A has just - // acquired the permit from the semaphore. - // - // A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search - // B: Acquire permit. - // B: Look at slot 2, decrement its usage_count to zero and continue the search - // B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1. - // B: Release pin and permit again - // B: Acquire permit. - // B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1. - // B: Release pin and permit again - // - // Now we're back in the starting situation that both slots have - // usage_count 1, but A has now been through one iteration of the - // find_victim() loop. This can repeat indefinitely and on each - // iteration, A's iteration count increases by one. - // - // So, even though the semaphore for the permits is fair, the victim search - // itself happens in parallel and is not fair. - // Hence even with a permit, a task can theoretically be starved. - // To avoid this, we'd need tokio to give priority to tasks that are holding - // permits for longer. - // Note that just yielding to tokio during iteration without such - // priority boosting is likely counter-productive. We'd just give more opportunities - // for B to bump usage count, further starving A. - crate::metrics::page_cache_errors_inc( - crate::metrics::PageCacheErrorKind::EvictIterLimit, - ); - anyhow::bail!("exceeded evict iter limit"); + if iters > self.slots.len() * (MAX_USAGE_COUNT as usize) { + unreachable!("find_victim_waiters prevents starvation"); } continue; } @@ -920,7 +895,16 @@ impl PageCache { inner.key = None; } crate::metrics::PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL.inc_by(iters as u64); - return Ok((slot_idx, inner)); + self.find_victim_sender + .try_send((slot_idx, inner)) + .expect("we always get in line first"); + match futures::poll!(receiver) { + Poll::Ready(Ok(res)) => return Ok(res), + Poll::Ready(Err(_closed)) => unreachable!("we never close"), + Poll::Pending => { + unreachable!("we just sent to the channel and got in line earlier") + } + } } } } @@ -957,6 +941,7 @@ impl PageCache { }) .collect(); + let (find_victim_sender, find_victim_waiters) = async_channel::bounded(num_pages); Self { materialized_page_map: Default::default(), immutable_page_map: Default::default(), @@ -964,6 +949,8 @@ impl PageCache { next_evict_slot: AtomicUsize::new(0), size_metrics, pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)), + find_victim_sender, + find_victim_waiters, } } }