mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 05:22:56 +00:00
Wait for WAL receiver to start
This commit is contained in:
@@ -400,11 +400,11 @@ impl PostgresNode {
|
||||
}
|
||||
|
||||
fn dump_log_file(&self) {
|
||||
if let Ok(mut file) = File::open(self.env.repo_path.join("pageserver.log")) {
|
||||
let mut buffer = String::new();
|
||||
file.read_to_string(&mut buffer).unwrap();
|
||||
println!("--------------- Dump pageserver.log:\n{}", buffer);
|
||||
}
|
||||
if let Ok(mut file) = File::open(self.env.repo_path.join("pageserver.log")) {
|
||||
let mut buffer = String::new();
|
||||
file.read_to_string(&mut buffer).unwrap();
|
||||
println!("--------------- Dump pageserver.log:\n{}", buffer);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn safe_psql(&self, db: &str, sql: &str) -> Vec<tokio_postgres::Row> {
|
||||
|
||||
@@ -497,35 +497,36 @@ impl PageCache {
|
||||
}
|
||||
|
||||
async fn wait_lsn(&self, lsn: u64) -> anyhow::Result<()> {
|
||||
let walreceiver_works = self.walreceiver_works.load(Ordering::Acquire);
|
||||
if walreceiver_works {
|
||||
self.seqwait_lsn
|
||||
.wait_for_timeout(lsn, TIMEOUT)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive",
|
||||
lsn >> 32,
|
||||
lsn & 0xffff_ffff
|
||||
)
|
||||
})?;
|
||||
} else {
|
||||
// There is a a race at postgres instance start
|
||||
// when we request a page before walsender established connection
|
||||
// and was able to stream the page. Just don't wait and return what we have.
|
||||
// TODO is there any corner case when this is incorrect?
|
||||
trace!(
|
||||
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
|
||||
self.last_valid_lsn.load(Ordering::Acquire),
|
||||
lsn
|
||||
);
|
||||
loop {
|
||||
let walreceiver_works = self.walreceiver_works.load(Ordering::Acquire);
|
||||
if walreceiver_works {
|
||||
self.seqwait_lsn
|
||||
.wait_for_timeout(lsn, TIMEOUT)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive",
|
||||
lsn >> 32,
|
||||
lsn & 0xffff_ffff
|
||||
)
|
||||
})?;
|
||||
break;
|
||||
} else {
|
||||
// There is a a race at postgres instance start
|
||||
// when we request a page before walsender established connection
|
||||
// and was able to stream the page. Just don't wait and return what we have.
|
||||
// TODO is there any corner case when this is incorrect?
|
||||
info!(
|
||||
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
|
||||
self.last_valid_lsn.load(Ordering::Acquire),
|
||||
lsn
|
||||
);
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
|
||||
let shared = self.shared.lock().unwrap();
|
||||
|
||||
if walreceiver_works {
|
||||
assert!(lsn <= shared.last_valid_lsn);
|
||||
}
|
||||
assert!(lsn <= shared.last_valid_lsn);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -857,13 +858,13 @@ impl PageCache {
|
||||
}
|
||||
}
|
||||
let relsize = tag.blknum + 1;
|
||||
info!("Size of relation {:?} at {} is {}", rel, lsn, relsize);
|
||||
trace!("Size of relation {:?} at {} is {}", rel, lsn, relsize);
|
||||
return Ok(relsize);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
info!("Size of relation {:?} at {} is zero", rel, lsn);
|
||||
trace!("Size of relation {:?} at {} is zero", rel, lsn);
|
||||
Ok(0)
|
||||
}
|
||||
|
||||
@@ -888,11 +889,11 @@ impl PageCache {
|
||||
buf.extend_from_slice(&k);
|
||||
let tag = BufferTag::unpack(&mut buf);
|
||||
if tag.rel == *rel {
|
||||
info!("Relation {:?} exists at {}", rel, lsn);
|
||||
trace!("Relation {:?} exists at {}", rel, lsn);
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
info!("Relation {:?} doesn't exist at {}", rel, lsn);
|
||||
trace!("Relation {:?} doesn't exist at {}", rel, lsn);
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user