mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 05:22:56 +00:00
page_cache: replace long mutex sleep with SeqWait
When calling into the page cache, it was possible to wait on a blocking mutex, which can stall the async executor. Replace that sleep with a SeqWait::wait_for(lsn).await so that the executor can go on with other work while we wait. Change walreceiver_works to an AtomicBool to avoid the awkwardness of taking the lock, then dropping it while we call wait_for and then acquiring it again to do real work.
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -1339,6 +1339,7 @@ dependencies = [
|
||||
"tokio-stream",
|
||||
"tui",
|
||||
"walkdir",
|
||||
"zenith_utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -40,3 +40,4 @@ hex = "0.4.3"
|
||||
tar = "0.4.33"
|
||||
|
||||
postgres_ffi = { path = "../postgres_ffi" }
|
||||
zenith_utils = { path = "../zenith_utils" }
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
use crate::restore_local_repo::restore_timeline;
|
||||
use crate::ZTimelineId;
|
||||
use crate::{walredo, PageServerConf};
|
||||
use anyhow::bail;
|
||||
use anyhow::{bail, Context};
|
||||
use bytes::Bytes;
|
||||
use core::ops::Bound::Included;
|
||||
use crossbeam_channel::unbounded;
|
||||
@@ -18,12 +18,13 @@ use lazy_static::lazy_static;
|
||||
use log::*;
|
||||
use rand::Rng;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU64};
|
||||
use std::sync::{Arc, Condvar, Mutex};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use std::{convert::TryInto, ops::AddAssign};
|
||||
use zenith_utils::seqwait::SeqWait;
|
||||
|
||||
// Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call.
|
||||
static TIMEOUT: Duration = Duration::from_secs(60);
|
||||
@@ -35,7 +36,8 @@ pub struct PageCache {
|
||||
pub walredo_sender: Sender<Arc<CacheEntry>>,
|
||||
pub walredo_receiver: Receiver<Arc<CacheEntry>>,
|
||||
|
||||
valid_lsn_condvar: Condvar,
|
||||
// Allows .await on the arrival of a particular LSN.
|
||||
seqwait_lsn: SeqWait,
|
||||
|
||||
// Counters, for metrics collection.
|
||||
pub num_entries: AtomicU64,
|
||||
@@ -48,6 +50,7 @@ pub struct PageCache {
|
||||
pub first_valid_lsn: AtomicU64,
|
||||
pub last_valid_lsn: AtomicU64,
|
||||
pub last_record_lsn: AtomicU64,
|
||||
walreceiver_works: AtomicBool,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -106,7 +109,6 @@ struct PageCacheShared {
|
||||
first_valid_lsn: u64,
|
||||
last_valid_lsn: u64,
|
||||
last_record_lsn: u64,
|
||||
walreceiver_works: bool,
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
@@ -170,9 +172,8 @@ fn init_page_cache() -> PageCache {
|
||||
first_valid_lsn: 0,
|
||||
last_valid_lsn: 0,
|
||||
last_record_lsn: 0,
|
||||
walreceiver_works: false,
|
||||
}),
|
||||
valid_lsn_condvar: Condvar::new(),
|
||||
seqwait_lsn: SeqWait::new(0),
|
||||
|
||||
walredo_sender: s,
|
||||
walredo_receiver: r,
|
||||
@@ -185,6 +186,7 @@ fn init_page_cache() -> PageCache {
|
||||
first_valid_lsn: AtomicU64::new(0),
|
||||
last_valid_lsn: AtomicU64::new(0),
|
||||
last_record_lsn: AtomicU64::new(0),
|
||||
walreceiver_works: AtomicBool::new(false),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -276,7 +278,7 @@ impl PageCache {
|
||||
//
|
||||
// Returns an 8k page image
|
||||
//
|
||||
pub fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> anyhow::Result<Bytes> {
|
||||
pub async fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> anyhow::Result<Bytes> {
|
||||
self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// Look up cache entry. If it's a page image, return that. If it's a WAL record,
|
||||
@@ -284,50 +286,32 @@ impl PageCache {
|
||||
let minkey = CacheKey { tag, lsn: 0 };
|
||||
let maxkey = CacheKey { tag, lsn };
|
||||
|
||||
let entry_rc: Arc<CacheEntry>;
|
||||
{
|
||||
let mut shared = self.shared.lock().unwrap();
|
||||
let mut waited = false;
|
||||
|
||||
if self.walreceiver_works.load(Ordering::Acquire) {
|
||||
self.seqwait_lsn
|
||||
.wait_for_timeout(lsn, TIMEOUT)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive",
|
||||
lsn >> 32,
|
||||
lsn & 0xffff_ffff
|
||||
)
|
||||
})?;
|
||||
} else {
|
||||
// There is a a race at postgres instance start
|
||||
// when we request a page before walsender established connection
|
||||
// and was able to stream the page. Just don't wait and return what we have.
|
||||
// TODO is there any corner case when this is incorrect?
|
||||
if !shared.walreceiver_works {
|
||||
trace!(
|
||||
" walreceiver doesn't work yet last_valid_lsn {}, requested {}",
|
||||
shared.last_valid_lsn,
|
||||
lsn
|
||||
);
|
||||
}
|
||||
trace!(
|
||||
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
|
||||
self.last_valid_lsn.load(Ordering::Acquire),
|
||||
lsn
|
||||
);
|
||||
}
|
||||
|
||||
if shared.walreceiver_works {
|
||||
while lsn > shared.last_valid_lsn {
|
||||
// TODO: Wait for the WAL receiver to catch up
|
||||
waited = true;
|
||||
trace!(
|
||||
"not caught up yet: {}, requested {}",
|
||||
shared.last_valid_lsn,
|
||||
lsn
|
||||
);
|
||||
let wait_result = self
|
||||
.valid_lsn_condvar
|
||||
.wait_timeout(shared, TIMEOUT)
|
||||
.unwrap();
|
||||
|
||||
shared = wait_result.0;
|
||||
if wait_result.1.timed_out() {
|
||||
bail!(
|
||||
"Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive",
|
||||
lsn >> 32,
|
||||
lsn & 0xffff_ffff
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
if waited {
|
||||
trace!("caught up now, continuing");
|
||||
}
|
||||
let entry_rc: Arc<CacheEntry>;
|
||||
{
|
||||
let shared = self.shared.lock().unwrap();
|
||||
|
||||
if lsn < shared.first_valid_lsn {
|
||||
bail!(
|
||||
@@ -540,11 +524,11 @@ impl PageCache {
|
||||
if lsn >= oldlsn {
|
||||
// Now we receive entries from walreceiver and should wait
|
||||
if from_walreceiver {
|
||||
shared.walreceiver_works = true;
|
||||
self.walreceiver_works.store(true, Ordering::Release);
|
||||
}
|
||||
|
||||
shared.last_valid_lsn = lsn;
|
||||
self.valid_lsn_condvar.notify_all();
|
||||
self.seqwait_lsn.advance(lsn);
|
||||
|
||||
self.last_valid_lsn.store(lsn, Ordering::Relaxed);
|
||||
} else {
|
||||
@@ -570,7 +554,7 @@ impl PageCache {
|
||||
|
||||
shared.last_valid_lsn = lsn;
|
||||
shared.last_record_lsn = lsn;
|
||||
self.valid_lsn_condvar.notify_all();
|
||||
self.seqwait_lsn.advance(lsn);
|
||||
|
||||
self.last_valid_lsn.store(lsn, Ordering::Relaxed);
|
||||
self.last_record_lsn.store(lsn, Ordering::Relaxed);
|
||||
@@ -620,7 +604,7 @@ impl PageCache {
|
||||
// 2. Request that page with GetPage@LSN, using Max LSN (i.e. get the latest page version)
|
||||
//
|
||||
//
|
||||
pub fn _test_get_page_at_lsn(&self) {
|
||||
pub async fn _test_get_page_at_lsn(&self) {
|
||||
// for quick testing of the get_page_at_lsn() funcion.
|
||||
//
|
||||
// Get a random page from the page cache. Apply all its WAL, by requesting
|
||||
@@ -650,7 +634,10 @@ impl PageCache {
|
||||
}
|
||||
|
||||
info!("testing GetPage@LSN for block {}", tag.unwrap().blknum);
|
||||
match self.get_page_at_lsn(tag.unwrap(), 0xffff_ffff_ffff_eeee) {
|
||||
match self
|
||||
.get_page_at_lsn(tag.unwrap(), 0xffff_ffff_ffff_eeee)
|
||||
.await
|
||||
{
|
||||
Ok(_img) => {
|
||||
// This prints out the whole page image.
|
||||
//println!("{:X?}", img);
|
||||
|
||||
@@ -852,7 +852,7 @@ impl Connection {
|
||||
blknum: req.blkno,
|
||||
};
|
||||
|
||||
let msg = match pcache.get_page_at_lsn(buf_tag, req.lsn) {
|
||||
let msg = match pcache.get_page_at_lsn(buf_tag, req.lsn).await {
|
||||
Ok(p) => BeMessage::ZenithReadResponse(ZenithReadResponse {
|
||||
ok: true,
|
||||
n_blocks: 0,
|
||||
|
||||
Reference in New Issue
Block a user