Forbid adding WAL to the repository after advancing last record LSN.

When you advance last record LSN, *all* changes up to that LSN should be
imported into repository. We have been a bit sloppy about that when it
comes to the checkpoint information that we also store in the repository.
In WAL receiver, for example, we would receive a WAL record, advance
last record LSN, and only then update the checkpoint relish at the same
LSN. Reorder that so that you advance the last record LSN only after
updating the checkpoint relish. It hasn't apparently caused any problems
so far, but let's be tidy.

Tighten the check for that in get_layer_for_write(), so that it checks for
'lsn > last_record_lsn' rather than 'lsn >= last_record_lsn'.
This commit is contained in:
Heikki Linnakangas
2021-09-10 10:59:09 +03:00
parent 03dff207db
commit ab33614ab1
5 changed files with 62 additions and 26 deletions

View File

@@ -229,6 +229,8 @@ fn bootstrap_timeline(
let wal_dir = pgdata_path.join("pg_wal");
restore_local_repo::import_timeline_wal(&wal_dir, &*timeline, lsn)?;
timeline.checkpoint()?;
println!(
"created initial timeline {} timeline.lsn {}",
tli,

View File

@@ -863,11 +863,7 @@ impl Timeline for LayeredTimeline {
fn advance_last_record_lsn(&self, new_lsn: Lsn) {
assert!(new_lsn.is_aligned());
let old_lsn = self.last_record_lsn.advance(new_lsn);
// since we are align incoming LSN's we can't have delta less
// then 0x8
assert!(old_lsn == new_lsn || (new_lsn.0 - old_lsn.0 >= 0x8));
self.last_record_lsn.advance(new_lsn);
}
fn get_last_record_lsn(&self) -> Lsn {
@@ -1108,7 +1104,7 @@ impl LayeredTimeline {
assert!(lsn.is_aligned());
let last_record_lsn = self.get_last_record_lsn();
if lsn < last_record_lsn {
if lsn <= last_record_lsn {
panic!(
"cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
lsn,

View File

@@ -373,9 +373,9 @@ mod tests {
);
// Truncate to zero length
tline.put_truncation(TESTREL_A, Lsn(0x60), 0)?;
tline.advance_last_record_lsn(Lsn(0x60));
assert_eq!(tline.get_relish_size(TESTREL_A, Lsn(0x60))?.unwrap(), 0);
tline.put_truncation(TESTREL_A, Lsn(0x68), 0)?;
tline.advance_last_record_lsn(Lsn(0x68));
assert_eq!(tline.get_relish_size(TESTREL_A, Lsn(0x68))?.unwrap(), 0);
// Extend from 0 to 2 blocks, leaving a gap
tline.put_page_image(TESTREL_A, 1, Lsn(0x70), TEST_IMG("foo blk 1"))?;

View File

@@ -46,12 +46,7 @@ pub fn import_timeline_from_postgres_datadir(
None => continue,
Some("pg_control") => {
import_nonrel_file(timeline, lsn, RelishTag::ControlFile, &direntry.path())?;
// Extract checkpoint record from pg_control and store is as separate object
let pg_control_bytes = timeline.get_page_at_lsn(RelishTag::ControlFile, 0, lsn)?;
let pg_control = ControlFileData::decode(&pg_control_bytes)?;
let checkpoint_bytes = pg_control.checkPointCopy.encode();
timeline.put_page_image(RelishTag::Checkpoint, 0, lsn, checkpoint_bytes)?;
import_control_file(timeline, lsn, &direntry.path())?;
}
Some("pg_filenode.map") => import_nonrel_file(
timeline,
@@ -132,8 +127,7 @@ pub fn import_timeline_from_postgres_datadir(
}
// TODO: Scan pg_tblspc
timeline.advance_last_record_lsn(lsn.align());
timeline.checkpoint()?;
timeline.advance_last_record_lsn(lsn);
Ok(())
}
@@ -215,6 +209,35 @@ fn import_nonrel_file(
Ok(())
}
///
/// Import pg_control file into the repository.
///
/// The control file is imported as is, but we also extract the checkpoint record
/// from it and store it separated.
fn import_control_file(timeline: &dyn Timeline, lsn: Lsn, path: &Path) -> Result<()> {
let mut file = File::open(path)?;
let mut buffer = Vec::new();
// read the whole file
file.read_to_end(&mut buffer)?;
info!("importing control file {}", path.display());
// Import it as ControlFile
timeline.put_page_image(
RelishTag::ControlFile,
0,
lsn,
Bytes::copy_from_slice(&buffer[..]),
)?;
// Extract the checkpoint record and import it separately.
let pg_control = ControlFileData::decode(&buffer)?;
let checkpoint_bytes = pg_control.checkPointCopy.encode();
timeline.put_page_image(RelishTag::Checkpoint, 0, lsn, checkpoint_bytes)?;
Ok(())
}
///
/// Import an SLRU segment file
///
@@ -316,6 +339,13 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint:
break;
}
if let Some((lsn, recdata)) = rec.unwrap() {
// The previous record has been handled, let the repository know that
// it is up-to-date to this LSN. (We do this here on the "next" iteration,
// rather than right after the save_decoded_record, because at the end of
// the WAL, we will also need to perform the update of the checkpoint data
// with the same LSN as the last actual record.)
timeline.advance_last_record_lsn(last_lsn);
let decoded = decode_wal_record(recdata.clone());
save_decoded_record(&mut checkpoint, timeline, &decoded, recdata, lsn)?;
last_lsn = lsn;
@@ -331,12 +361,19 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint:
offset = 0;
}
info!("reached end of WAL at {}", last_lsn);
let checkpoint_bytes = checkpoint.encode();
timeline.put_page_image(RelishTag::Checkpoint, 0, last_lsn, checkpoint_bytes)?;
if last_lsn != startpoint {
info!(
"reached end of WAL at {}, updating checkpoint info",
last_lsn
);
let checkpoint_bytes = checkpoint.encode();
timeline.put_page_image(RelishTag::Checkpoint, 0, last_lsn, checkpoint_bytes)?;
timeline.advance_last_record_lsn(last_lsn);
} else {
info!("no WAL to import at {}", last_lsn);
}
timeline.advance_last_record_lsn(last_lsn.align());
timeline.checkpoint()?;
Ok(())
}
@@ -535,9 +572,6 @@ pub fn save_decoded_record(
}
}
}
// Now that this record has been handled, let the repository know that
// it is up-to-date to this LSN
timeline.advance_last_record_lsn(lsn.align());
Ok(())
}

View File

@@ -223,7 +223,6 @@ fn walreceiver_main(
recdata,
lsn,
)?;
last_rec_lsn = lsn;
let new_checkpoint_bytes = checkpoint.encode();
// Check if checkpoint data was updated by save_decoded_record
@@ -235,6 +234,11 @@ fn walreceiver_main(
new_checkpoint_bytes,
)?;
}
// Now that this record has been fully handled, including updating the
// checkpoint data, let the repository know that it is up-to-date to this LSN
timeline.advance_last_record_lsn(lsn);
last_rec_lsn = lsn;
}
// Somewhat arbitrarily, if we have at least 10 complete wal segments (16 MB each),