mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 22:50:38 +00:00
[issue #56] Fix race at postgres instance + walreceiver start. Uses postgres/vendor issue_56_rebased branch.
This commit is contained in:
@@ -17,7 +17,7 @@ use rocksdb;
|
||||
use std::cmp::min;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU64};
|
||||
use std::sync::atomic::{AtomicU64};
|
||||
use std::sync::{Arc, Condvar, Mutex};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
@@ -51,7 +51,6 @@ pub struct PageCache {
|
||||
pub first_valid_lsn: AtomicU64,
|
||||
pub last_valid_lsn: AtomicU64,
|
||||
pub last_record_lsn: AtomicU64,
|
||||
walreceiver_works: AtomicBool,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -200,7 +199,6 @@ fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache
|
||||
first_valid_lsn: AtomicU64::new(0),
|
||||
last_valid_lsn: AtomicU64::new(0),
|
||||
last_record_lsn: AtomicU64::new(0),
|
||||
walreceiver_works: AtomicBool::new(false),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -497,36 +495,17 @@ impl PageCache {
|
||||
}
|
||||
|
||||
async fn wait_lsn(&self, lsn: u64) -> anyhow::Result<()> {
|
||||
loop {
|
||||
let walreceiver_works = self.walreceiver_works.load(Ordering::Acquire);
|
||||
if walreceiver_works {
|
||||
self.seqwait_lsn
|
||||
.wait_for_timeout(lsn, TIMEOUT)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive",
|
||||
lsn >> 32,
|
||||
lsn & 0xffff_ffff
|
||||
)
|
||||
})?;
|
||||
break;
|
||||
} else {
|
||||
// There is a a race at postgres instance start
|
||||
// when we request a page before walsender established connection
|
||||
// and was able to stream the page. Just don't wait and return what we have.
|
||||
// TODO is there any corner case when this is incorrect?
|
||||
info!(
|
||||
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
|
||||
self.last_valid_lsn.load(Ordering::Acquire),
|
||||
lsn
|
||||
);
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
self.seqwait_lsn
|
||||
.wait_for_timeout(lsn, TIMEOUT)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive",
|
||||
lsn >> 32,
|
||||
lsn & 0xffff_ffff
|
||||
)
|
||||
})?;
|
||||
|
||||
let shared = self.shared.lock().unwrap();
|
||||
assert!(lsn <= shared.last_valid_lsn);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -535,10 +514,25 @@ impl PageCache {
|
||||
//
|
||||
// Returns an 8k page image
|
||||
//
|
||||
pub async fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> anyhow::Result<Bytes> {
|
||||
pub async fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: u64) -> anyhow::Result<Bytes> {
|
||||
self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
self.wait_lsn(lsn).await?;
|
||||
let mut lsn = req_lsn;
|
||||
//When invalid LSN is requested, it means "don't wait, return latest version of the page"
|
||||
//This is necessary for bootstrap.
|
||||
if lsn == 0
|
||||
{
|
||||
lsn = self.last_valid_lsn.load(Ordering::Acquire);
|
||||
trace!(
|
||||
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
|
||||
self.last_valid_lsn.load(Ordering::Acquire),
|
||||
lsn
|
||||
);
|
||||
}
|
||||
else
|
||||
{
|
||||
self.wait_lsn(lsn).await?;
|
||||
}
|
||||
|
||||
// Look up cache entry. If it's a page image, return that. If it's a WAL record,
|
||||
// ask the WAL redo service to reconstruct the page image from the WAL records.
|
||||
@@ -738,16 +732,12 @@ impl PageCache {
|
||||
}
|
||||
|
||||
//
|
||||
pub fn advance_last_valid_lsn(&self, lsn: u64, from_walreceiver: bool) {
|
||||
pub fn advance_last_valid_lsn(&self, lsn: u64) {
|
||||
let mut shared = self.shared.lock().unwrap();
|
||||
|
||||
// Can't move backwards.
|
||||
let oldlsn = shared.last_valid_lsn;
|
||||
if lsn >= oldlsn {
|
||||
// Now we receive entries from walreceiver and should wait
|
||||
if from_walreceiver {
|
||||
self.walreceiver_works.store(true, Ordering::Release);
|
||||
}
|
||||
|
||||
shared.last_valid_lsn = lsn;
|
||||
self.seqwait_lsn.advance(lsn);
|
||||
|
||||
@@ -323,7 +323,7 @@ fn restore_wal(
|
||||
}
|
||||
// Now that this record has been handled, let the page cache know that
|
||||
// it is up-to-date to this LSN
|
||||
pcache.advance_last_valid_lsn(lsn, false);
|
||||
pcache.advance_last_valid_lsn(lsn);
|
||||
last_lsn = lsn;
|
||||
} else {
|
||||
break;
|
||||
|
||||
@@ -283,7 +283,7 @@ async fn walreceiver_main(
|
||||
// better reflect that, because GetPage@LSN requests might also point in the
|
||||
// middle of a record, if the request LSN was taken from the server's current
|
||||
// flush ptr.
|
||||
pcache.advance_last_valid_lsn(endlsn, true);
|
||||
pcache.advance_last_valid_lsn(endlsn);
|
||||
|
||||
if !caught_up && endlsn >= end_of_wal {
|
||||
info!(
|
||||
|
||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: daec929ec3...af9c507616
Reference in New Issue
Block a user