diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 8c16a1a926..00d9acee61 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -17,7 +17,7 @@ use rocksdb; use std::cmp::min; use std::collections::HashMap; use std::sync::atomic::Ordering; -use std::sync::atomic::{AtomicBool, AtomicU64}; +use std::sync::atomic::{AtomicU64}; use std::sync::{Arc, Condvar, Mutex}; use std::thread; use std::time::Duration; @@ -51,7 +51,6 @@ pub struct PageCache { pub first_valid_lsn: AtomicU64, pub last_valid_lsn: AtomicU64, pub last_record_lsn: AtomicU64, - walreceiver_works: AtomicBool, } #[derive(Clone)] @@ -200,7 +199,6 @@ fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache first_valid_lsn: AtomicU64::new(0), last_valid_lsn: AtomicU64::new(0), last_record_lsn: AtomicU64::new(0), - walreceiver_works: AtomicBool::new(false), } } @@ -497,36 +495,17 @@ impl PageCache { } async fn wait_lsn(&self, lsn: u64) -> anyhow::Result<()> { - loop { - let walreceiver_works = self.walreceiver_works.load(Ordering::Acquire); - if walreceiver_works { - self.seqwait_lsn - .wait_for_timeout(lsn, TIMEOUT) - .await - .with_context(|| { - format!( - "Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive", - lsn >> 32, - lsn & 0xffff_ffff - ) - })?; - break; - } else { - // There is a a race at postgres instance start - // when we request a page before walsender established connection - // and was able to stream the page. Just don't wait and return what we have. - // TODO is there any corner case when this is incorrect? - info!( - "walreceiver doesn't work yet last_valid_lsn {}, requested {}", - self.last_valid_lsn.load(Ordering::Acquire), - lsn - ); - tokio::time::sleep(Duration::from_secs(1)).await; - } - } + self.seqwait_lsn + .wait_for_timeout(lsn, TIMEOUT) + .await + .with_context(|| { + format!( + "Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive", + lsn >> 32, + lsn & 0xffff_ffff + ) + })?; - let shared = self.shared.lock().unwrap(); - assert!(lsn <= shared.last_valid_lsn); Ok(()) } @@ -535,10 +514,25 @@ impl PageCache { // // Returns an 8k page image // - pub async fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> anyhow::Result { + pub async fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: u64) -> anyhow::Result { self.num_getpage_requests.fetch_add(1, Ordering::Relaxed); - self.wait_lsn(lsn).await?; + let mut lsn = req_lsn; + //When invalid LSN is requested, it means "don't wait, return latest version of the page" + //This is necessary for bootstrap. + if lsn == 0 + { + lsn = self.last_valid_lsn.load(Ordering::Acquire); + trace!( + "walreceiver doesn't work yet last_valid_lsn {}, requested {}", + self.last_valid_lsn.load(Ordering::Acquire), + lsn + ); + } + else + { + self.wait_lsn(lsn).await?; + } // Look up cache entry. If it's a page image, return that. If it's a WAL record, // ask the WAL redo service to reconstruct the page image from the WAL records. @@ -738,16 +732,12 @@ impl PageCache { } // - pub fn advance_last_valid_lsn(&self, lsn: u64, from_walreceiver: bool) { + pub fn advance_last_valid_lsn(&self, lsn: u64) { let mut shared = self.shared.lock().unwrap(); // Can't move backwards. let oldlsn = shared.last_valid_lsn; if lsn >= oldlsn { - // Now we receive entries from walreceiver and should wait - if from_walreceiver { - self.walreceiver_works.store(true, Ordering::Release); - } shared.last_valid_lsn = lsn; self.seqwait_lsn.advance(lsn); diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index c9cb050b8d..d090419b0f 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -323,7 +323,7 @@ fn restore_wal( } // Now that this record has been handled, let the page cache know that // it is up-to-date to this LSN - pcache.advance_last_valid_lsn(lsn, false); + pcache.advance_last_valid_lsn(lsn); last_lsn = lsn; } else { break; diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index a70fbb9c02..20d6551630 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -283,7 +283,7 @@ async fn walreceiver_main( // better reflect that, because GetPage@LSN requests might also point in the // middle of a record, if the request LSN was taken from the server's current // flush ptr. - pcache.advance_last_valid_lsn(endlsn, true); + pcache.advance_last_valid_lsn(endlsn); if !caught_up && endlsn >= end_of_wal { info!( diff --git a/vendor/postgres b/vendor/postgres index daec929ec3..af9c507616 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit daec929ec3f357f1af19b33fa6862acaa2fcf34d +Subproject commit af9c50761691c9e6890eecc0396ba85177958ba8