diff --git a/Cargo.lock b/Cargo.lock index 94e181ebe1..5e8d3dd20f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1339,6 +1339,7 @@ dependencies = [ "tokio-stream", "tui", "walkdir", + "zenith_utils", ] [[package]] diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 05f5213ac4..b161be36fe 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -40,3 +40,4 @@ hex = "0.4.3" tar = "0.4.33" postgres_ffi = { path = "../postgres_ffi" } +zenith_utils = { path = "../zenith_utils" } diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 7eb741e5f2..7db50aee1f 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -9,7 +9,7 @@ use crate::restore_local_repo::restore_timeline; use crate::ZTimelineId; use crate::{walredo, PageServerConf}; -use anyhow::bail; +use anyhow::{bail, Context}; use bytes::Bytes; use core::ops::Bound::Included; use crossbeam_channel::unbounded; @@ -18,12 +18,13 @@ use lazy_static::lazy_static; use log::*; use rand::Rng; use std::collections::{BTreeMap, HashMap}; -use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; +use std::sync::atomic::{AtomicBool, AtomicU64}; use std::sync::{Arc, Condvar, Mutex}; use std::thread; use std::time::Duration; use std::{convert::TryInto, ops::AddAssign}; +use zenith_utils::seqwait::SeqWait; // Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call. static TIMEOUT: Duration = Duration::from_secs(60); @@ -35,7 +36,8 @@ pub struct PageCache { pub walredo_sender: Sender>, pub walredo_receiver: Receiver>, - valid_lsn_condvar: Condvar, + // Allows .await on the arrival of a particular LSN. + seqwait_lsn: SeqWait, // Counters, for metrics collection. pub num_entries: AtomicU64, @@ -48,6 +50,7 @@ pub struct PageCache { pub first_valid_lsn: AtomicU64, pub last_valid_lsn: AtomicU64, pub last_record_lsn: AtomicU64, + walreceiver_works: AtomicBool, } #[derive(Clone)] @@ -106,7 +109,6 @@ struct PageCacheShared { first_valid_lsn: u64, last_valid_lsn: u64, last_record_lsn: u64, - walreceiver_works: bool, } lazy_static! { @@ -170,9 +172,8 @@ fn init_page_cache() -> PageCache { first_valid_lsn: 0, last_valid_lsn: 0, last_record_lsn: 0, - walreceiver_works: false, }), - valid_lsn_condvar: Condvar::new(), + seqwait_lsn: SeqWait::new(0), walredo_sender: s, walredo_receiver: r, @@ -185,6 +186,7 @@ fn init_page_cache() -> PageCache { first_valid_lsn: AtomicU64::new(0), last_valid_lsn: AtomicU64::new(0), last_record_lsn: AtomicU64::new(0), + walreceiver_works: AtomicBool::new(false), } } @@ -276,7 +278,7 @@ impl PageCache { // // Returns an 8k page image // - pub fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> anyhow::Result { + pub async fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> anyhow::Result { self.num_getpage_requests.fetch_add(1, Ordering::Relaxed); // Look up cache entry. If it's a page image, return that. If it's a WAL record, @@ -284,50 +286,32 @@ impl PageCache { let minkey = CacheKey { tag, lsn: 0 }; let maxkey = CacheKey { tag, lsn }; - let entry_rc: Arc; - { - let mut shared = self.shared.lock().unwrap(); - let mut waited = false; - + if self.walreceiver_works.load(Ordering::Acquire) { + self.seqwait_lsn + .wait_for_timeout(lsn, TIMEOUT) + .await + .with_context(|| { + format!( + "Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive", + lsn >> 32, + lsn & 0xffff_ffff + ) + })?; + } else { // There is a a race at postgres instance start // when we request a page before walsender established connection // and was able to stream the page. Just don't wait and return what we have. // TODO is there any corner case when this is incorrect? - if !shared.walreceiver_works { - trace!( - " walreceiver doesn't work yet last_valid_lsn {}, requested {}", - shared.last_valid_lsn, - lsn - ); - } + trace!( + "walreceiver doesn't work yet last_valid_lsn {}, requested {}", + self.last_valid_lsn.load(Ordering::Acquire), + lsn + ); + } - if shared.walreceiver_works { - while lsn > shared.last_valid_lsn { - // TODO: Wait for the WAL receiver to catch up - waited = true; - trace!( - "not caught up yet: {}, requested {}", - shared.last_valid_lsn, - lsn - ); - let wait_result = self - .valid_lsn_condvar - .wait_timeout(shared, TIMEOUT) - .unwrap(); - - shared = wait_result.0; - if wait_result.1.timed_out() { - bail!( - "Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive", - lsn >> 32, - lsn & 0xffff_ffff - ); - } - } - } - if waited { - trace!("caught up now, continuing"); - } + let entry_rc: Arc; + { + let shared = self.shared.lock().unwrap(); if lsn < shared.first_valid_lsn { bail!( @@ -540,11 +524,11 @@ impl PageCache { if lsn >= oldlsn { // Now we receive entries from walreceiver and should wait if from_walreceiver { - shared.walreceiver_works = true; + self.walreceiver_works.store(true, Ordering::Release); } shared.last_valid_lsn = lsn; - self.valid_lsn_condvar.notify_all(); + self.seqwait_lsn.advance(lsn); self.last_valid_lsn.store(lsn, Ordering::Relaxed); } else { @@ -570,7 +554,7 @@ impl PageCache { shared.last_valid_lsn = lsn; shared.last_record_lsn = lsn; - self.valid_lsn_condvar.notify_all(); + self.seqwait_lsn.advance(lsn); self.last_valid_lsn.store(lsn, Ordering::Relaxed); self.last_record_lsn.store(lsn, Ordering::Relaxed); @@ -620,7 +604,7 @@ impl PageCache { // 2. Request that page with GetPage@LSN, using Max LSN (i.e. get the latest page version) // // - pub fn _test_get_page_at_lsn(&self) { + pub async fn _test_get_page_at_lsn(&self) { // for quick testing of the get_page_at_lsn() funcion. // // Get a random page from the page cache. Apply all its WAL, by requesting @@ -650,7 +634,10 @@ impl PageCache { } info!("testing GetPage@LSN for block {}", tag.unwrap().blknum); - match self.get_page_at_lsn(tag.unwrap(), 0xffff_ffff_ffff_eeee) { + match self + .get_page_at_lsn(tag.unwrap(), 0xffff_ffff_ffff_eeee) + .await + { Ok(_img) => { // This prints out the whole page image. //println!("{:X?}", img); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index f95dd84039..99e93c3925 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -852,7 +852,7 @@ impl Connection { blknum: req.blkno, }; - let msg = match pcache.get_page_at_lsn(buf_tag, req.lsn) { + let msg = match pcache.get_page_at_lsn(buf_tag, req.lsn).await { Ok(p) => BeMessage::ZenithReadResponse(ZenithReadResponse { ok: true, n_blocks: 0,