From ab33614ab166d53d0a35837f2e9103f041378f9a Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Fri, 10 Sep 2021 10:59:09 +0300 Subject: [PATCH] Forbid adding WAL to the repository after advancing last record LSN. When you advance last record LSN, *all* changes up to that LSN should be imported into repository. We have been a bit sloppy about that when it comes to the checkpoint information that we also store in the repository. In WAL receiver, for example, we would receive a WAL record, advance last record LSN, and only then update the checkpoint relish at the same LSN. Reorder that so that you advance the last record LSN only after updating the checkpoint relish. It hasn't apparently caused any problems so far, but let's be tidy. Tighten the check for that in get_layer_for_write(), so that it checks for 'lsn > last_record_lsn' rather than 'lsn >= last_record_lsn'. --- pageserver/src/branches.rs | 2 + pageserver/src/layered_repository.rs | 8 +--- pageserver/src/repository.rs | 6 +-- pageserver/src/restore_local_repo.rs | 66 +++++++++++++++++++++------- pageserver/src/walreceiver.rs | 6 ++- 5 files changed, 62 insertions(+), 26 deletions(-) diff --git a/pageserver/src/branches.rs b/pageserver/src/branches.rs index dd01615eb9..a2e554513e 100644 --- a/pageserver/src/branches.rs +++ b/pageserver/src/branches.rs @@ -229,6 +229,8 @@ fn bootstrap_timeline( let wal_dir = pgdata_path.join("pg_wal"); restore_local_repo::import_timeline_wal(&wal_dir, &*timeline, lsn)?; + timeline.checkpoint()?; + println!( "created initial timeline {} timeline.lsn {}", tli, diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index afd955b11e..b44f113d74 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -863,11 +863,7 @@ impl Timeline for LayeredTimeline { fn advance_last_record_lsn(&self, new_lsn: Lsn) { assert!(new_lsn.is_aligned()); - let old_lsn = self.last_record_lsn.advance(new_lsn); - - // since we are align incoming LSN's we can't have delta less - // then 0x8 - assert!(old_lsn == new_lsn || (new_lsn.0 - old_lsn.0 >= 0x8)); + self.last_record_lsn.advance(new_lsn); } fn get_last_record_lsn(&self) -> Lsn { @@ -1108,7 +1104,7 @@ impl LayeredTimeline { assert!(lsn.is_aligned()); let last_record_lsn = self.get_last_record_lsn(); - if lsn < last_record_lsn { + if lsn <= last_record_lsn { panic!( "cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})", lsn, diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 044eb5e721..56f9cba5ba 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -373,9 +373,9 @@ mod tests { ); // Truncate to zero length - tline.put_truncation(TESTREL_A, Lsn(0x60), 0)?; - tline.advance_last_record_lsn(Lsn(0x60)); - assert_eq!(tline.get_relish_size(TESTREL_A, Lsn(0x60))?.unwrap(), 0); + tline.put_truncation(TESTREL_A, Lsn(0x68), 0)?; + tline.advance_last_record_lsn(Lsn(0x68)); + assert_eq!(tline.get_relish_size(TESTREL_A, Lsn(0x68))?.unwrap(), 0); // Extend from 0 to 2 blocks, leaving a gap tline.put_page_image(TESTREL_A, 1, Lsn(0x70), TEST_IMG("foo blk 1"))?; diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index f0ef3a3443..fa43c5376a 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -46,12 +46,7 @@ pub fn import_timeline_from_postgres_datadir( None => continue, Some("pg_control") => { - import_nonrel_file(timeline, lsn, RelishTag::ControlFile, &direntry.path())?; - // Extract checkpoint record from pg_control and store is as separate object - let pg_control_bytes = timeline.get_page_at_lsn(RelishTag::ControlFile, 0, lsn)?; - let pg_control = ControlFileData::decode(&pg_control_bytes)?; - let checkpoint_bytes = pg_control.checkPointCopy.encode(); - timeline.put_page_image(RelishTag::Checkpoint, 0, lsn, checkpoint_bytes)?; + import_control_file(timeline, lsn, &direntry.path())?; } Some("pg_filenode.map") => import_nonrel_file( timeline, @@ -132,8 +127,7 @@ pub fn import_timeline_from_postgres_datadir( } // TODO: Scan pg_tblspc - timeline.advance_last_record_lsn(lsn.align()); - timeline.checkpoint()?; + timeline.advance_last_record_lsn(lsn); Ok(()) } @@ -215,6 +209,35 @@ fn import_nonrel_file( Ok(()) } +/// +/// Import pg_control file into the repository. +/// +/// The control file is imported as is, but we also extract the checkpoint record +/// from it and store it separated. +fn import_control_file(timeline: &dyn Timeline, lsn: Lsn, path: &Path) -> Result<()> { + let mut file = File::open(path)?; + let mut buffer = Vec::new(); + // read the whole file + file.read_to_end(&mut buffer)?; + + info!("importing control file {}", path.display()); + + // Import it as ControlFile + timeline.put_page_image( + RelishTag::ControlFile, + 0, + lsn, + Bytes::copy_from_slice(&buffer[..]), + )?; + + // Extract the checkpoint record and import it separately. + let pg_control = ControlFileData::decode(&buffer)?; + let checkpoint_bytes = pg_control.checkPointCopy.encode(); + timeline.put_page_image(RelishTag::Checkpoint, 0, lsn, checkpoint_bytes)?; + + Ok(()) +} + /// /// Import an SLRU segment file /// @@ -316,6 +339,13 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: break; } if let Some((lsn, recdata)) = rec.unwrap() { + // The previous record has been handled, let the repository know that + // it is up-to-date to this LSN. (We do this here on the "next" iteration, + // rather than right after the save_decoded_record, because at the end of + // the WAL, we will also need to perform the update of the checkpoint data + // with the same LSN as the last actual record.) + timeline.advance_last_record_lsn(last_lsn); + let decoded = decode_wal_record(recdata.clone()); save_decoded_record(&mut checkpoint, timeline, &decoded, recdata, lsn)?; last_lsn = lsn; @@ -331,12 +361,19 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: offset = 0; } - info!("reached end of WAL at {}", last_lsn); - let checkpoint_bytes = checkpoint.encode(); - timeline.put_page_image(RelishTag::Checkpoint, 0, last_lsn, checkpoint_bytes)?; + if last_lsn != startpoint { + info!( + "reached end of WAL at {}, updating checkpoint info", + last_lsn + ); + let checkpoint_bytes = checkpoint.encode(); + timeline.put_page_image(RelishTag::Checkpoint, 0, last_lsn, checkpoint_bytes)?; + + timeline.advance_last_record_lsn(last_lsn); + } else { + info!("no WAL to import at {}", last_lsn); + } - timeline.advance_last_record_lsn(last_lsn.align()); - timeline.checkpoint()?; Ok(()) } @@ -535,9 +572,6 @@ pub fn save_decoded_record( } } } - // Now that this record has been handled, let the repository know that - // it is up-to-date to this LSN - timeline.advance_last_record_lsn(lsn.align()); Ok(()) } diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 62761626f1..69721be480 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -223,7 +223,6 @@ fn walreceiver_main( recdata, lsn, )?; - last_rec_lsn = lsn; let new_checkpoint_bytes = checkpoint.encode(); // Check if checkpoint data was updated by save_decoded_record @@ -235,6 +234,11 @@ fn walreceiver_main( new_checkpoint_bytes, )?; } + + // Now that this record has been fully handled, including updating the + // checkpoint data, let the repository know that it is up-to-date to this LSN + timeline.advance_last_record_lsn(lsn); + last_rec_lsn = lsn; } // Somewhat arbitrarily, if we have at least 10 complete wal segments (16 MB each),