diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index e62df68e77..24e328c357 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -876,17 +876,56 @@ impl PageCache { &'static self, _permit_witness: &PinnedSlotsPermit, ) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard)> { + let integrity_check = |guard: &std::sync::MutexGuard<( + VecDeque, + VecDeque, + Vec< + Option<( + Option, + Option<(usize, tokio::sync::RwLockWriteGuard)>, + )>, + >, + )>| { + let (waiters, free, slots) = &**guard; + for (slot_idx, slot) in slots.iter().enumerate() { + match slot { + None => { + assert!(!waiters.contains(&slot_idx)); + assert!(free.contains(&slot_idx)); + } + Some((None, None)) => { + assert!(waiters.contains(&slot_idx)); + assert!(!free.contains(&slot_idx)); + } + Some((Some(_), Some(_))) => { + assert!(!waiters.contains(&slot_idx)); + assert!(!free.contains(&slot_idx)); + } + Some((Some(_), None)) => { + assert!(waiters.contains(&slot_idx)); + assert!(!free.contains(&slot_idx)); + } + Some((None, Some(_))) => { + assert!(!waiters.contains(&slot_idx)); + assert!(!free.contains(&slot_idx)); + } + } + } + }; + // Get in line. - let my_idx = { + let my_waitslot_idx = { let mut guard = self.find_victim_waiters.lock().unwrap(); - let (waiters, free, slots) = &mut *guard; - let my_idx = free + integrity_check(&guard); + let (waiters, free, waitslot) = &mut *guard; + let my_waitslot_idx = free .pop_front() .expect("can't happen, len(slots) = len(waiters"); - waiters.push_back(my_idx); - let prev = slots[my_idx].replace((None, None)); + waiters.push_back(my_waitslot_idx); + let prev = waitslot[my_waitslot_idx].replace((None, None)); debug_assert!(prev.is_none()); - my_idx + integrity_check(&guard); + my_waitslot_idx }; let mut iters = 0; @@ -915,10 +954,11 @@ impl PageCache { // put in the victim we found { - let mut slots_guard = self.find_victim_waiters.lock().unwrap(); - let (waiters, _, slots) = &mut *slots_guard; + let mut guard = self.find_victim_waiters.lock().unwrap(); + integrity_check(&guard); + let (waiters, _, waitslots) = &mut *guard; let winner_idx = waiters.pop_front().expect("we put ourselves in"); - let winner_slot = slots[winner_idx].as_mut().unwrap(); + let winner_slot = waitslots[winner_idx].as_mut().unwrap(); let prev = winner_slot.1.replace((slot_idx, inner)); debug_assert!( prev.is_none(), @@ -927,27 +967,53 @@ impl PageCache { if let Some(waker) = winner_slot.0.take() { waker.wake() } + integrity_check(&guard); } - // take the victim that was found by someone else - return Ok(poll_fn(move |cx| { - let mut guard = self.find_victim_waiters.lock().unwrap(); - let (_, free, waiters) = &mut *guard; + let mut poll_num = 0; + let mut drop_guard = Some(scopeguard::guard((), |()| { + panic!("must not drop this future until Ready"); + })); - let my_slot = waiters[my_idx].as_mut().unwrap(); - if let Some(res) = my_slot.1.take() { - // .take() resets the waiters slot to None - free.push_back(my_idx); - debug_assert!(my_slot.0.is_none()); - debug_assert!(my_slot.1.is_none()); - waiters[my_idx] = None; + // take the victim that was found by someone else + let mut fut = poll_fn(move |cx| { + let mut guard = self.find_victim_waiters.lock().unwrap(); + integrity_check(&guard); + let (_, free, waitslots) = &mut *guard; + let my_waitslot = waitslots[my_waitslot_idx].as_mut().unwrap(); + poll_num += 1; + assert!( + poll_num <= 2, + "once we place the waker in the slot, next wakeup should have a result: {}", + my_waitslot.1.is_some() + ); + if let Some(res) = my_waitslot.1.take() { + // above .take() resets the waiters slot to None + free.push_back(my_waitslot_idx); + debug_assert!(my_waitslot.0.is_none()); + debug_assert!(my_waitslot.1.is_none()); + waitslots[my_waitslot_idx] = None; + let _ = scopeguard::ScopeGuard::into_inner(drop_guard.take().unwrap()); + integrity_check(&guard); return Poll::Ready(res); } - let prev = my_slot.0.replace(cx.waker().clone()); + assert_eq!(poll_num, 1); + let prev = my_waitslot.0.replace(cx.waker().clone()); debug_assert!(prev.is_none()); + integrity_check(&guard); Poll::Pending - }) - .await); + }); + loop { + match tokio::time::timeout(Duration::from_secs(1), &mut fut).await { + Ok(res) => return Ok(res), + Err(_) => { + tracing::warn!( + "find_victim: timeout waiting for victim\n{}", + std::backtrace::Backtrace::force_capture() + ); + } + } + } } } }