[refer #439] Correctly handle LSN parameter in BASEBACKUP command

This commit is contained in:
Konstantin Knizhnik
2021-08-19 19:53:22 +03:00
parent 5eb1738e8b
commit ead94feb05
11 changed files with 73 additions and 44 deletions

1
Cargo.lock generated
View File

@@ -2283,6 +2283,7 @@ name = "walkeeper"
version = "0.1.0"
dependencies = [
"anyhow",
"bincode",
"byteorder",
"bytes",
"clap",

View File

@@ -714,6 +714,41 @@ impl Timeline for ObjectTimeline {
let iter = self.obj_store.objects(self.timelineid, lsn)?;
Ok(Box::new(ObjectHistory { lsn, iter }))
}
//
// Wait until WAL has been received up to the given LSN.
//
fn wait_lsn(&self, req_lsn: Lsn) -> Result<Lsn> {
let mut lsn = req_lsn;
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
// This is necessary for bootstrap.
if lsn == Lsn(0) {
let last_valid_lsn = self.last_valid_lsn.load();
trace!(
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
last_valid_lsn,
lsn
);
lsn = last_valid_lsn;
}
trace!(
"Start waiting for LSN {}, valid LSN is {}",
lsn,
self.last_valid_lsn.load()
);
self.last_valid_lsn
.wait_for_timeout(lsn, TIMEOUT)
.with_context(|| {
format!(
"Timed out while waiting for WAL record at LSN {} to arrive. valid LSN in {}",
lsn,
self.last_valid_lsn.load(),
)
})?;
//trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
Ok(lsn)
}
}
impl ObjectTimeline {
@@ -822,40 +857,6 @@ impl ObjectTimeline {
}
}
//
// Wait until WAL has been received up to the given LSN.
//
fn wait_lsn(&self, mut lsn: Lsn) -> Result<Lsn> {
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
// This is necessary for bootstrap.
if lsn == Lsn(0) {
let last_valid_lsn = self.last_valid_lsn.load();
trace!(
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
last_valid_lsn,
lsn
);
lsn = last_valid_lsn;
}
trace!(
"Start waiting for LSN {}, valid LSN is {}",
lsn,
self.last_valid_lsn.load()
);
self.last_valid_lsn
.wait_for_timeout(lsn, TIMEOUT)
.with_context(|| {
format!(
"Timed out while waiting for WAL record at LSN {} to arrive. valid LSN in {}",
lsn,
self.last_valid_lsn.load(),
)
})?;
//trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
Ok(lsn)
}
///
/// Iterate through object versions with given key, in reverse LSN order.
///

View File

@@ -357,8 +357,13 @@ impl PageServerHandler {
/* Send a tarball of the latest snapshot on the timeline */
let req_lsn = lsn.unwrap_or_else(|| timeline.get_last_valid_lsn());
let req_lsn = match lsn {
Some(lsn) => {
timeline.wait_lsn(lsn)?;
lsn
}
None => timeline.get_last_valid_lsn()
};
{
let mut writer = CopyDataSink { pgb };
let mut basebackup = basebackup::Basebackup::new(
@@ -468,8 +473,9 @@ impl postgres_backend::Handler for PageServerHandler {
} else if query_string.starts_with("basebackup ") {
let (_, params_raw) = query_string.split_at("basebackup ".len());
let params = params_raw.split(" ").collect::<Vec<_>>();
info!("params.len()={}, params[2].len()={}", params.len(), params[2].len());
ensure!(
params.len() == 2,
params.len() >= 2,
"invalid param number for basebackup command"
);
@@ -479,7 +485,7 @@ impl postgres_backend::Handler for PageServerHandler {
self.check_permission(Some(tenantid))?;
// TODO are there any tests with lsn option?
let lsn = if params.len() == 3 {
let lsn = if params.len() == 3 && params[2].len() != 0 {
Some(Lsn::from_str(params[2])?)
} else {
None
@@ -575,6 +581,10 @@ impl postgres_backend::Handler for PageServerHandler {
timeline.advance_last_valid_lsn(last_lsn);
break;
}
FeMessage::CopyFailed => {
info!("Copy failed");
break;
}
FeMessage::Sync => {}
_ => bail!("unexpected message {:?}", msg),
}

View File

@@ -158,6 +158,11 @@ pub trait Timeline: Send + Sync {
/// Relation size is increased implicitly and decreased with Truncate updates.
// TODO ordering guarantee?
fn history<'a>(&'a self) -> Result<Box<dyn History + 'a>>;
//
// Wait until WAL has been received up to the given LSN.
//
fn wait_lsn(&self, lsn: Lsn) -> Result<Lsn>;
}
pub trait History: Iterator<Item = Result<Modification>> {

View File

@@ -207,7 +207,7 @@ fn walreceiver_main(
waldecoder.feed_bytes(data);
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
while let Ok(Some((lsn, recdata))) = waldecoder.poll_decode() {
// Save old checkpoint value to compare with it after decoding WAL record
let old_checkpoint_bytes = checkpoint.encode();
let decoded = decode_wal_record(recdata.clone());

View File

@@ -214,10 +214,9 @@ pub const PGDATA_SUBDIRS: [&'static str; 22] = [
"pg_logical/mappings",
];
pub const PGDATA_SPECIAL_FILES: [&'static str; 4] = [
pub const PGDATA_SPECIAL_FILES: [&'static str; 3] = [
"pg_hba.conf",
"pg_ident.conf",
"postgresql.conf",
"postgresql.auto.conf",
];

View File

@@ -299,6 +299,7 @@ impl<'pg> ReceiveWalConn<'pg> {
this_timeline.get().set_info(&my_info);
/* Need to persist our vote first */
this_timeline.get().save_control_file(true)?;
this_timeline.get().set_info(&my_info);
let mut flushed_restart_lsn = Lsn(0);
let wal_seg_size = server_info.wal_seg_size as usize;

View File

@@ -76,8 +76,12 @@ impl ReplicationConn {
let feedback = HotStandbyFeedback::des(&m)?;
subscriber.add_hs_feedback(feedback);
}
FeMessage::Sync => {}
FeMessage::CopyFailed => {
return Err(anyhow!("Copy failed"))
}
_ => {
// We only handle `CopyData` messages. Anything else is ignored.
// We only handle `CopyData`, 'Sync', 'CopyFailed' messages. Anything else is ignored.
info!("unexpected message {:?}", msg);
}
}

View File

@@ -340,7 +340,7 @@ impl PostgresBackend {
// We prefer explicit pattern matching to wildcards, because
// this helps us spot the places where new variants are missing
FeMessage::CopyData(_) | FeMessage::CopyDone => {
FeMessage::CopyData(_) | FeMessage::CopyDone | FeMessage::CopyFailed => {
bail!("unexpected message type: {:?}", msg);
}
}

View File

@@ -31,6 +31,7 @@ pub enum FeMessage {
Terminate,
CopyData(Bytes),
CopyDone,
CopyFailed,
PasswordMessage(Bytes),
}
@@ -138,6 +139,7 @@ impl FeMessage {
b'X' => Ok(Some(FeMessage::Terminate)),
b'd' => Ok(Some(FeMessage::CopyData(body))),
b'c' => Ok(Some(FeMessage::CopyDone)),
b'f' => Ok(Some(FeMessage::CopyFailed)),
b'p' => Ok(Some(FeMessage::PasswordMessage(body))),
tag => Err(anyhow!("unknown message tag: {},'{:?}'", tag, body)),
}
@@ -338,6 +340,7 @@ pub enum BeMessage<'a> {
ControlFile,
CopyData(&'a [u8]),
CopyDone,
CopyFailed,
CopyInResponse,
CopyOutResponse,
CopyBothResponse,
@@ -546,6 +549,11 @@ impl<'a> BeMessage<'a> {
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
}
BeMessage::CopyFailed => {
buf.put_u8(b'f');
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
}
BeMessage::CopyInResponse => {
buf.put_u8(b'G');
write_body(buf, |buf| {