diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 79e69caba5..d2698a7022 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -220,23 +220,23 @@ impl<'a> Basebackup<'a> { let mut pg_control = ControlFileData::decode(&pg_control_bytes)?; let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?; - // Here starts pg_resetwal inspired magic // Generate new pg_control and WAL needed for bootstrap - let new_segno = self.lsn.segment_number(pg_constants::WAL_SEGMENT_SIZE) + 1; - - let new_lsn = XLogSegNoOffsetToRecPtr( - new_segno, + let checkpoint_segno = self.lsn.segment_number(pg_constants::WAL_SEGMENT_SIZE); + let checkpoint_lsn = XLogSegNoOffsetToRecPtr( + checkpoint_segno, XLOG_SIZE_OF_XLOG_LONG_PHD as u32, pg_constants::WAL_SEGMENT_SIZE, ); - checkpoint.redo = new_lsn; + checkpoint.redo = self.lsn.0 + self.lsn.calc_padding(8u32); //reset some fields we don't want to preserve checkpoint.oldestActiveXid = 0; //save new values in pg_control - pg_control.checkPoint = new_lsn; + pg_control.checkPoint = checkpoint_lsn; pg_control.checkPointCopy = checkpoint; + info!("pg_control.state = {}", pg_control.state); + pg_control.state = pg_constants::DB_SHUTDOWNED; //send pg_control let pg_control_bytes = pg_control.encode(); @@ -246,7 +246,7 @@ impl<'a> Basebackup<'a> { //send wal segment let wal_file_name = XLogFileName( 1, // FIXME: always use Postgres timeline 1 - new_segno, + checkpoint_segno, pg_constants::WAL_SEGMENT_SIZE, ); let wal_file_path = format!("pg_wal/{}", wal_file_name); diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs index 431f5a3ed4..394fd2ad4c 100644 --- a/pageserver/src/object_repository.rs +++ b/pageserver/src/object_repository.rs @@ -478,7 +478,10 @@ impl Timeline for ObjectTimeline { let val = ObjectValue::RelationSize(new_nblocks); trace!( "Extended relation {} from {} to {} blocks at {}", - tag.rel, old_nblocks, new_nblocks, lsn + tag.rel, + old_nblocks, + new_nblocks, + lsn ); self.obj_store.put(&key, lsn, &ObjectValue::ser(&val)?)?; @@ -556,16 +559,16 @@ impl Timeline for ObjectTimeline { fn advance_last_record_lsn(&self, lsn: Lsn) { // Can't move backwards. let old = self.last_record_lsn.fetch_max(lsn); - if old <= lsn { - // Also advance last_valid_lsn - let old = self.last_valid_lsn.advance(lsn); - // Can't move backwards. - if lsn < old { - warn!( - "attempted to move last record LSN backwards (was {}, new {})", - old, lsn - ); - } + assert!(old <= lsn); + + // Also advance last_valid_lsn + let old = self.last_valid_lsn.advance(lsn); + // Can't move backwards. + if lsn < old { + warn!( + "attempted to move last record LSN backwards (was {}, new {})", + old, lsn + ); } } fn get_last_record_lsn(&self) -> Lsn { diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index aa2adf969d..8ff56dfe8e 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -334,7 +334,6 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: // Assume that an error means we've reached the end of // a partial WAL record. So that's ok. trace!("WAL decoder error {:?}", rec); - waldecoder.set_position(Lsn((segno + 1) * pg_constants::WAL_SEGMENT_SIZE as u64)); break; } if let Some((lsn, recdata)) = rec.unwrap() { diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index b0c4be8133..0376e534a3 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -65,13 +65,6 @@ impl WalStreamDecoder { } } - pub fn set_position(&mut self, lsn: Lsn) { - self.lsn = lsn; - self.contlen = 0; - self.padlen = 0; - self.inputbuf.clear(); - } - pub fn feed_bytes(&mut self, buf: &[u8]) { self.inputbuf.extend_from_slice(buf); } @@ -97,12 +90,10 @@ impl WalStreamDecoder { let hdr = XLogLongPageHeaderData::from_bytes(&mut self.inputbuf); if hdr.std.xlp_pageaddr != self.lsn.0 { - info!( - "Receive page with LSN {} instead of expected {}", - Lsn(hdr.std.xlp_pageaddr), - self.lsn - ); - self.lsn = Lsn(hdr.std.xlp_pageaddr); + return Err(WalDecodeError { + msg: "invalid xlog segment header".into(), + lsn: self.lsn, + }); } // TODO: verify the remaining fields in the header diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 1d98a0f6fc..054f89a97c 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -150,12 +150,12 @@ fn walreceiver_main( error!("No previous WAL position"); } - // FIXME: We have to do it to handle new segment generated by pg_resetwal at compute node startup. - // This hack is needed only when pageserver is connected directly to compute node, not through walkeeper. - startpoint = Lsn::max(startpoint, end_of_wal); - - // Always start from the beginning of segment - startpoint = Lsn(startpoint.0 & !(pg_constants::WAL_SEGMENT_SIZE as u64 - 1)); + // There might be some padding after the last full record, skip it. + // + // FIXME: It probably would be better to always start streaming from the beginning + // of the page, or the segment, so that we could check the page/segment headers + // too. Just for the sake of paranoia. + startpoint += startpoint.calc_padding(8u32); debug!( "last_record_lsn {} starting replication from {} for timeline {}, server is at {}...", diff --git a/postgres_ffi/src/pg_constants.rs b/postgres_ffi/src/pg_constants.rs index a9af87eac6..ab28511e37 100644 --- a/postgres_ffi/src/pg_constants.rs +++ b/postgres_ffi/src/pg_constants.rs @@ -87,6 +87,7 @@ pub const XACT_XINFO_HAS_TWOPHASE: u32 = 1u32 << 4; pub const XLOG_NEXTOID: u8 = 0x30; pub const XLOG_SWITCH: u8 = 0x40; pub const XLOG_SMGR_TRUNCATE: u8 = 0x20; +pub const DB_SHUTDOWNED: u32 = 1; // From multixact.h pub const FIRST_MULTIXACT_ID: u32 = 1; diff --git a/postgres_ffi/src/xlog_utils.rs b/postgres_ffi/src/xlog_utils.rs index dd58f2ef25..bc4a30a423 100644 --- a/postgres_ffi/src/xlog_utils.rs +++ b/postgres_ffi/src/xlog_utils.rs @@ -399,7 +399,7 @@ pub fn generate_wal_segment(pg_control: &ControlFileData) -> Bytes { xlp_magic: XLOG_PAGE_MAGIC as u16, xlp_info: pg_constants::XLP_LONG_HEADER, xlp_tli: 1, // FIXME: always use Postgres timeline 1 - xlp_pageaddr: pg_control.checkPointCopy.redo - XLOG_SIZE_OF_XLOG_LONG_PHD as u64, + xlp_pageaddr: pg_control.checkPoint - XLOG_SIZE_OF_XLOG_LONG_PHD as u64, xlp_rem_len: 0, } }, diff --git a/test_runner/batch_others/test_restart_compute.py b/test_runner/batch_others/test_restart_compute.py index d11842e35f..dc3be94bfe 100644 --- a/test_runner/batch_others/test_restart_compute.py +++ b/test_runner/batch_others/test_restart_compute.py @@ -19,7 +19,7 @@ def test_restart_compute(zenith_cli, pageserver, postgres, pg_bin): cur.execute("INSERT INTO foo VALUES ('bar')") # Stop and restart the Postgres instance - pg.stop().start() + pg.stop_and_destroy().create_start('test_restart_compute') with closing(pg.connect()) as conn: with conn.cursor() as cur: diff --git a/vendor/postgres b/vendor/postgres index 928beb4241..044647e499 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 928beb4241d7dcf542928195f59baa5eee18ac1e +Subproject commit 044647e49971afd96a77f77c65cfa4532543a51d diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index 5094a256f5..0ca5a6ff9b 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -247,16 +247,6 @@ impl ReceiveWalConn { /* Calculate WAL end based on local data */ let (flush_lsn, timeline) = self.timeline.find_end_of_wal(&self.conf.data_dir, true); my_info.flush_lsn = flush_lsn; - - // FIXME: Yet another trick to handle creation of new WAL segment at - // compute node startup (a-la pg_resetwal). - // If restart_lsn was not adjusted then walproposer will try to perform recovery - // because restart_lsn != flush_lsn and fail because first WAL segment is missed. - // May be it is better to handle it by wal proposer, but it will contradict with wal_proposer - // usage in other branches. - // This adjustment is needed only for first segment. - let min_lsn = Lsn((server_info.wal_seg_size as u64) * 2); - my_info.restart_lsn = Lsn::max(my_info.restart_lsn, min_lsn); my_info.server.timeline = timeline; /* Report my identifier to proposer */ diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index 0605f212c7..024662d582 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -234,10 +234,7 @@ impl TimelineTools for Option> { let seg_size = self.get().get_info().server.wal_seg_size as usize; assert!(seg_size > 0); let (lsn, timeline) = find_end_of_wal(data_dir, seg_size, precise); - // FIXME: because of generation of new segment at compute node start we just do not have first WAL segment - let wal_start = Lsn((seg_size * 2) as u64); - let lsn = Lsn::max(Lsn(lsn), wal_start); - (lsn, timeline) + (Lsn(lsn), timeline) } }