diff --git a/Cargo.lock b/Cargo.lock index d313f4b611..be3f179d5f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -158,17 +158,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "async-channel" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" -dependencies = [ - "concurrent-queue", - "event-listener", - "futures-core", -] - [[package]] name = "async-compression" version = "0.4.0" @@ -1042,15 +1031,6 @@ dependencies = [ "zstd", ] -[[package]] -name = "concurrent-queue" -version = "2.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f057a694a54f12365049b0958a1685bb52d567f5593b355fbf685838e873d400" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "const_format" version = "0.2.30" @@ -1472,12 +1452,6 @@ dependencies = [ "libc", ] -[[package]] -name = "event-listener" -version = "2.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" - [[package]] name = "fail" version = "0.5.1" @@ -2700,7 +2674,6 @@ name = "pageserver" version = "0.1.0" dependencies = [ "anyhow", - "async-channel", "async-compression", "async-stream", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 6a2ad5e909..2b9da977e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,6 @@ license = "Apache-2.0" ## All dependency versions, used in the project [workspace.dependencies] anyhow = { version = "1.0", features = ["backtrace"] } -async-channel = "1.9.0" async-compression = { version = "0.4.0", features = ["tokio", "gzip"] } flate2 = "1.0.26" async-stream = "0.3" diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 021e43984e..3eb01003df 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -12,7 +12,6 @@ testing = ["fail/failpoints"] [dependencies] anyhow.workspace = true -async-channel.workspace = true async-compression.workspace = true async-stream.workspace = true async-trait.workspace = true diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 95e26cb0cb..e62df68e77 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -72,13 +72,14 @@ //! use std::{ - collections::{hash_map::Entry, HashMap}, + collections::{hash_map::Entry, HashMap, VecDeque}, convert::TryInto, + future::poll_fn, sync::{ atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering}, - Arc, Weak, + Arc, Mutex, Weak, }, - task::Poll, + task::{Poll, Waker}, time::Duration, }; @@ -252,10 +253,16 @@ pub struct PageCache { /// This is interpreted modulo the page cache size. next_evict_slot: AtomicUsize, - find_victim_sender: - async_channel::Sender<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>, - find_victim_waiters: - async_channel::Receiver<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>, + find_victim_waiters: Mutex<( + VecDeque, + VecDeque, + Vec< + Option<( + Option, + Option<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>, + )>, + >, + )>, size_metrics: &'static PageCacheSizeMetrics, } @@ -870,7 +877,17 @@ impl PageCache { _permit_witness: &PinnedSlotsPermit, ) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard)> { // Get in line. - let receiver = self.find_victim_waiters.recv(); + let my_idx = { + let mut guard = self.find_victim_waiters.lock().unwrap(); + let (waiters, free, slots) = &mut *guard; + let my_idx = free + .pop_front() + .expect("can't happen, len(slots) = len(waiters"); + waiters.push_back(my_idx); + let prev = slots[my_idx].replace((None, None)); + debug_assert!(prev.is_none()); + my_idx + }; let mut iters = 0; loop { @@ -895,16 +912,42 @@ impl PageCache { inner.key = None; } crate::metrics::PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL.inc_by(iters as u64); - self.find_victim_sender - .try_send((slot_idx, inner)) - .expect("we always get in line first"); - match futures::poll!(receiver) { - Poll::Ready(Ok(res)) => return Ok(res), - Poll::Ready(Err(_closed)) => unreachable!("we never close"), - Poll::Pending => { - unreachable!("we just sent to the channel and got in line earlier") + + // put in the victim we found + { + let mut slots_guard = self.find_victim_waiters.lock().unwrap(); + let (waiters, _, slots) = &mut *slots_guard; + let winner_idx = waiters.pop_front().expect("we put ourselves in"); + let winner_slot = slots[winner_idx].as_mut().unwrap(); + let prev = winner_slot.1.replace((slot_idx, inner)); + debug_assert!( + prev.is_none(), + "ensure we didn't mess up this simple ring buffer structure" + ); + if let Some(waker) = winner_slot.0.take() { + waker.wake() } } + + // take the victim that was found by someone else + return Ok(poll_fn(move |cx| { + let mut guard = self.find_victim_waiters.lock().unwrap(); + let (_, free, waiters) = &mut *guard; + + let my_slot = waiters[my_idx].as_mut().unwrap(); + if let Some(res) = my_slot.1.take() { + // .take() resets the waiters slot to None + free.push_back(my_idx); + debug_assert!(my_slot.0.is_none()); + debug_assert!(my_slot.1.is_none()); + waiters[my_idx] = None; + return Poll::Ready(res); + } + let prev = my_slot.0.replace(cx.waker().clone()); + debug_assert!(prev.is_none()); + Poll::Pending + }) + .await); } } } @@ -941,7 +984,6 @@ impl PageCache { }) .collect(); - let (find_victim_sender, find_victim_waiters) = async_channel::bounded(num_pages); Self { materialized_page_map: Default::default(), immutable_page_map: Default::default(), @@ -949,8 +991,11 @@ impl PageCache { next_evict_slot: AtomicUsize::new(0), size_metrics, pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)), - find_victim_sender, - find_victim_waiters, + find_victim_waiters: Mutex::new((VecDeque::new(), (0..num_pages).collect(), { + let mut v = Vec::with_capacity(num_pages); + v.resize_with(num_pages, || None); + v + })), } } }