diff --git a/Cargo.lock b/Cargo.lock index 67c00293b0..5c2eadf133 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2283,6 +2283,7 @@ name = "walkeeper" version = "0.1.0" dependencies = [ "anyhow", + "bincode", "byteorder", "bytes", "clap", diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs index 31b8c40d4e..3946f85606 100644 --- a/pageserver/src/object_repository.rs +++ b/pageserver/src/object_repository.rs @@ -714,6 +714,41 @@ impl Timeline for ObjectTimeline { let iter = self.obj_store.objects(self.timelineid, lsn)?; Ok(Box::new(ObjectHistory { lsn, iter })) } + + // + // Wait until WAL has been received up to the given LSN. + // + fn wait_lsn(&self, req_lsn: Lsn) -> Result { + let mut lsn = req_lsn; + // When invalid LSN is requested, it means "don't wait, return latest version of the page" + // This is necessary for bootstrap. + if lsn == Lsn(0) { + let last_valid_lsn = self.last_valid_lsn.load(); + trace!( + "walreceiver doesn't work yet last_valid_lsn {}, requested {}", + last_valid_lsn, + lsn + ); + lsn = last_valid_lsn; + } + trace!( + "Start waiting for LSN {}, valid LSN is {}", + lsn, + self.last_valid_lsn.load() + ); + self.last_valid_lsn + .wait_for_timeout(lsn, TIMEOUT) + .with_context(|| { + format!( + "Timed out while waiting for WAL record at LSN {} to arrive. valid LSN in {}", + lsn, + self.last_valid_lsn.load(), + ) + })?; + //trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load()); + + Ok(lsn) + } } impl ObjectTimeline { @@ -822,40 +857,6 @@ impl ObjectTimeline { } } - // - // Wait until WAL has been received up to the given LSN. - // - fn wait_lsn(&self, mut lsn: Lsn) -> Result { - // When invalid LSN is requested, it means "don't wait, return latest version of the page" - // This is necessary for bootstrap. - if lsn == Lsn(0) { - let last_valid_lsn = self.last_valid_lsn.load(); - trace!( - "walreceiver doesn't work yet last_valid_lsn {}, requested {}", - last_valid_lsn, - lsn - ); - lsn = last_valid_lsn; - } - trace!( - "Start waiting for LSN {}, valid LSN is {}", - lsn, - self.last_valid_lsn.load() - ); - self.last_valid_lsn - .wait_for_timeout(lsn, TIMEOUT) - .with_context(|| { - format!( - "Timed out while waiting for WAL record at LSN {} to arrive. valid LSN in {}", - lsn, - self.last_valid_lsn.load(), - ) - })?; - //trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load()); - - Ok(lsn) - } - /// /// Iterate through object versions with given key, in reverse LSN order. /// diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 2caffb1eb2..c9f9591582 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -357,8 +357,13 @@ impl PageServerHandler { /* Send a tarball of the latest snapshot on the timeline */ - let req_lsn = lsn.unwrap_or_else(|| timeline.get_last_valid_lsn()); - + let req_lsn = match lsn { + Some(lsn) => { + timeline.wait_lsn(lsn)?; + lsn + } + None => timeline.get_last_valid_lsn() + }; { let mut writer = CopyDataSink { pgb }; let mut basebackup = basebackup::Basebackup::new( @@ -468,8 +473,9 @@ impl postgres_backend::Handler for PageServerHandler { } else if query_string.starts_with("basebackup ") { let (_, params_raw) = query_string.split_at("basebackup ".len()); let params = params_raw.split(" ").collect::>(); + info!("params.len()={}, params[2].len()={}", params.len(), params[2].len()); ensure!( - params.len() == 2, + params.len() >= 2, "invalid param number for basebackup command" ); @@ -479,7 +485,7 @@ impl postgres_backend::Handler for PageServerHandler { self.check_permission(Some(tenantid))?; // TODO are there any tests with lsn option? - let lsn = if params.len() == 3 { + let lsn = if params.len() == 3 && params[2].len() != 0 { Some(Lsn::from_str(params[2])?) } else { None @@ -575,6 +581,10 @@ impl postgres_backend::Handler for PageServerHandler { timeline.advance_last_valid_lsn(last_lsn); break; } + FeMessage::CopyFailed => { + info!("Copy failed"); + break; + } FeMessage::Sync => {} _ => bail!("unexpected message {:?}", msg), } diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index bb6388a532..74f603bb64 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -158,6 +158,11 @@ pub trait Timeline: Send + Sync { /// Relation size is increased implicitly and decreased with Truncate updates. // TODO ordering guarantee? fn history<'a>(&'a self) -> Result>; + + // + // Wait until WAL has been received up to the given LSN. + // + fn wait_lsn(&self, lsn: Lsn) -> Result; } pub trait History: Iterator> { diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 8ae690a251..fd23258e7f 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -207,7 +207,7 @@ fn walreceiver_main( waldecoder.feed_bytes(data); - while let Some((lsn, recdata)) = waldecoder.poll_decode()? { + while let Ok(Some((lsn, recdata))) = waldecoder.poll_decode() { // Save old checkpoint value to compare with it after decoding WAL record let old_checkpoint_bytes = checkpoint.encode(); let decoded = decode_wal_record(recdata.clone()); diff --git a/postgres_ffi/src/pg_constants.rs b/postgres_ffi/src/pg_constants.rs index 89364f9147..fd3f8b2069 100644 --- a/postgres_ffi/src/pg_constants.rs +++ b/postgres_ffi/src/pg_constants.rs @@ -214,10 +214,9 @@ pub const PGDATA_SUBDIRS: [&'static str; 22] = [ "pg_logical/mappings", ]; -pub const PGDATA_SPECIAL_FILES: [&'static str; 4] = [ +pub const PGDATA_SPECIAL_FILES: [&'static str; 3] = [ "pg_hba.conf", "pg_ident.conf", - "postgresql.conf", "postgresql.auto.conf", ]; diff --git a/vendor/postgres b/vendor/postgres index 04cfa326a5..3ca60b1989 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 04cfa326a543171967c16954306f5a9dd8a470ea +Subproject commit 3ca60b19890de1d202528884533f28065eb42f1a diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index 9483a2c6cb..741c8eb2cc 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -299,6 +299,7 @@ impl<'pg> ReceiveWalConn<'pg> { this_timeline.get().set_info(&my_info); /* Need to persist our vote first */ this_timeline.get().save_control_file(true)?; + this_timeline.get().set_info(&my_info); let mut flushed_restart_lsn = Lsn(0); let wal_seg_size = server_info.wal_seg_size as usize; diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index 907e4868cd..cabd706e0d 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -76,8 +76,12 @@ impl ReplicationConn { let feedback = HotStandbyFeedback::des(&m)?; subscriber.add_hs_feedback(feedback); } + FeMessage::Sync => {} + FeMessage::CopyFailed => { + return Err(anyhow!("Copy failed")) + } _ => { - // We only handle `CopyData` messages. Anything else is ignored. + // We only handle `CopyData`, 'Sync', 'CopyFailed' messages. Anything else is ignored. info!("unexpected message {:?}", msg); } } diff --git a/zenith_utils/src/postgres_backend.rs b/zenith_utils/src/postgres_backend.rs index d4de084442..091ddb2f1c 100644 --- a/zenith_utils/src/postgres_backend.rs +++ b/zenith_utils/src/postgres_backend.rs @@ -340,7 +340,7 @@ impl PostgresBackend { // We prefer explicit pattern matching to wildcards, because // this helps us spot the places where new variants are missing - FeMessage::CopyData(_) | FeMessage::CopyDone => { + FeMessage::CopyData(_) | FeMessage::CopyDone | FeMessage::CopyFailed => { bail!("unexpected message type: {:?}", msg); } } diff --git a/zenith_utils/src/pq_proto.rs b/zenith_utils/src/pq_proto.rs index 780474a4b9..f206e5e85f 100644 --- a/zenith_utils/src/pq_proto.rs +++ b/zenith_utils/src/pq_proto.rs @@ -31,6 +31,7 @@ pub enum FeMessage { Terminate, CopyData(Bytes), CopyDone, + CopyFailed, PasswordMessage(Bytes), } @@ -138,6 +139,7 @@ impl FeMessage { b'X' => Ok(Some(FeMessage::Terminate)), b'd' => Ok(Some(FeMessage::CopyData(body))), b'c' => Ok(Some(FeMessage::CopyDone)), + b'f' => Ok(Some(FeMessage::CopyFailed)), b'p' => Ok(Some(FeMessage::PasswordMessage(body))), tag => Err(anyhow!("unknown message tag: {},'{:?}'", tag, body)), } @@ -338,6 +340,7 @@ pub enum BeMessage<'a> { ControlFile, CopyData(&'a [u8]), CopyDone, + CopyFailed, CopyInResponse, CopyOutResponse, CopyBothResponse, @@ -546,6 +549,11 @@ impl<'a> BeMessage<'a> { write_body(buf, |_| Ok::<(), io::Error>(())).unwrap(); } + BeMessage::CopyFailed => { + buf.put_u8(b'f'); + write_body(buf, |_| Ok::<(), io::Error>(())).unwrap(); + } + BeMessage::CopyInResponse => { buf.put_u8(b'G'); write_body(buf, |buf| {