Compare commits

...

1 Commits

Author SHA1 Message Date
anastasia
babd2339cc [issue #56] Fix race at postgres instance + walreceiver start. Uses postgres/vendor issue_56 branch.
TODO: rebase on main
2021-04-22 15:51:44 +03:00
5 changed files with 20 additions and 22 deletions

View File

@@ -3,7 +3,6 @@ use control_plane::compute::ComputeControlPlane;
use control_plane::local_env;
use control_plane::local_env::PointInTime;
use control_plane::storage::TestStorageControlPlane;
use std::{thread, time};
// XXX: force all redo at the end
// -- restart + seqscan won't read deleted stuff
@@ -97,9 +96,6 @@ fn test_pageserver_two_timelines() {
node1.start().unwrap();
node2.start().unwrap();
//give walreceiver time to connect
thread::sleep(time::Duration::from_secs(3));
// check node1
node1.safe_psql(
"postgres",

View File

@@ -106,7 +106,6 @@ struct PageCacheShared {
first_valid_lsn: u64,
last_valid_lsn: u64,
last_record_lsn: u64,
walreceiver_works: bool,
}
lazy_static! {
@@ -170,7 +169,6 @@ fn init_page_cache() -> PageCache {
first_valid_lsn: 0,
last_valid_lsn: 0,
last_record_lsn: 0,
walreceiver_works: false,
}),
valid_lsn_condvar: Condvar::new(),
@@ -276,9 +274,17 @@ impl PageCache {
//
// Returns an 8k page image
//
pub fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> anyhow::Result<Bytes> {
pub fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: u64) -> anyhow::Result<Bytes> {
self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
let mut lsn = req_lsn;
//When invalid LSN is requested, it means "don't wait, return latest version of the page"
//This is necessary for bootstrap.
//TODO should we use last_valid_lsn here instead of maxvalue?
if lsn == 0
{
lsn = 0xffff_ffff_ffff_eeee;
}
// Look up cache entry. If it's a page image, return that. If it's a WAL record,
// ask the WAL redo service to reconstruct the page image from the WAL records.
let minkey = CacheKey { tag, lsn: 0 };
@@ -292,16 +298,15 @@ impl PageCache {
// There is a a race at postgres instance start
// when we request a page before walsender established connection
// and was able to stream the page. Just don't wait and return what we have.
// TODO is there any corner case when this is incorrect?
if !shared.walreceiver_works {
if req_lsn == 0
{
trace!(
" walreceiver doesn't work yet last_valid_lsn {}, requested {}",
shared.last_valid_lsn,
lsn
);
"walsender hasn't started yet. Don't wait. last_valid_lsn {}, requested {}",
shared.last_valid_lsn, lsn);
}
if shared.walreceiver_works {
if req_lsn != 0
{
while lsn > shared.last_valid_lsn {
// TODO: Wait for the WAL receiver to catch up
waited = true;
@@ -325,6 +330,7 @@ impl PageCache {
}
}
}
if waited {
trace!("caught up now, continuing");
}
@@ -532,16 +538,12 @@ impl PageCache {
}
//
pub fn advance_last_valid_lsn(&self, lsn: u64, from_walreceiver: bool) {
pub fn advance_last_valid_lsn(&self, lsn: u64) {
let mut shared = self.shared.lock().unwrap();
// Can't move backwards.
let oldlsn = shared.last_valid_lsn;
if lsn >= oldlsn {
// Now we receive entries from walreceiver and should wait
if from_walreceiver {
shared.walreceiver_works = true;
}
shared.last_valid_lsn = lsn;
self.valid_lsn_condvar.notify_all();

View File

@@ -325,7 +325,7 @@ fn restore_wal(
}
// Now that this record has been handled, let the page cache know that
// it is up-to-date to this LSN
pcache.advance_last_valid_lsn(lsn, false);
pcache.advance_last_valid_lsn(lsn);
last_lsn = lsn;
} else {
break;

View File

@@ -252,7 +252,7 @@ async fn walreceiver_main(
// better reflect that, because GetPage@LSN requests might also point in the
// middle of a record, if the request LSN was taken from the server's current
// flush ptr.
pcache.advance_last_valid_lsn(endlsn, true);
pcache.advance_last_valid_lsn(endlsn);
if !caught_up && endlsn >= end_of_wal {
info!(