From e272a380b467ff69f1f0a90ef3e37e63739febb5 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Tue, 19 Oct 2021 09:48:04 +0300 Subject: [PATCH] On new repo, start writing WAL only after the initial checkpoint record. Previously, the first WAL record on the 'main' branch overwrote the initial checkpoint record, with invalid 'xl_prev'. That's harmless, but also pretty ugly. I bumped into this while I was trying to tighen up the checks for when a valid 'prev_lsn' is required. With this patch, the first WAL record gets a valid 'xl_prev' value. It doesn't matter much currently, but let's be tidy. --- pageserver/src/restore_local_repo.rs | 128 +++++++++++++++++++++++++-- 1 file changed, 122 insertions(+), 6 deletions(-) diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 60eb9ce278..8afa2676e2 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -7,10 +7,10 @@ use postgres_ffi::nonrelfile_utils::slru_may_delete_clogsegment; use std::cmp::min; use std::fs; use std::fs::File; -use std::io::Read; -use std::path::Path; +use std::io::{Read, Seek, SeekFrom}; +use std::path::{Path, PathBuf}; -use anyhow::{bail, Result}; +use anyhow::{anyhow, bail, Result}; use bytes::{Buf, Bytes}; use tracing::*; @@ -37,6 +37,8 @@ pub fn import_timeline_from_postgres_datadir( writer: &dyn TimelineWriter, lsn: Lsn, ) -> Result<()> { + let mut pg_control: Option = None; + // Scan 'global' for direntry in fs::read_dir(path.join("global"))? { let direntry = direntry?; @@ -44,7 +46,7 @@ pub fn import_timeline_from_postgres_datadir( None => continue, Some("pg_control") => { - import_control_file(writer, lsn, &direntry.path())?; + pg_control = Some(import_control_file(writer, lsn, &direntry.path())?); } Some("pg_filenode.map") => import_nonrel_file( writer, @@ -127,6 +129,18 @@ pub fn import_timeline_from_postgres_datadir( writer.advance_last_record_lsn(lsn); + // Import WAL. This is needed even when starting from a shutdown checkpoint, because + // this reads the checkpoint record itself, advancing the tip of the timeline to + // *after* the checkpoint record. And crucially, it initializes the 'prev_lsn' + let pg_control = pg_control.ok_or_else(|| anyhow!("pg_control file not found"))?; + import_wal( + &path.join("pg_wal"), + writer, + Lsn(pg_control.checkPointCopy.redo), + lsn, + &mut pg_control.checkPointCopy.clone(), + )?; + Ok(()) } @@ -212,7 +226,11 @@ fn import_nonrel_file( /// /// The control file is imported as is, but we also extract the checkpoint record /// from it and store it separated. -fn import_control_file(timeline: &dyn TimelineWriter, lsn: Lsn, path: &Path) -> Result<()> { +fn import_control_file( + timeline: &dyn TimelineWriter, + lsn: Lsn, + path: &Path, +) -> Result { let mut file = File::open(path)?; let mut buffer = Vec::new(); // read the whole file @@ -233,7 +251,7 @@ fn import_control_file(timeline: &dyn TimelineWriter, lsn: Lsn, path: &Path) -> let checkpoint_bytes = pg_control.checkPointCopy.encode(); timeline.put_page_image(RelishTag::Checkpoint, 0, lsn, checkpoint_bytes)?; - Ok(()) + Ok(pg_control) } /// @@ -285,6 +303,104 @@ fn import_slru_file( Ok(()) } +/// Scan PostgreSQL WAL files in given directory and load all records between +/// 'startpoint' and 'endpoint' into the repository. +fn import_wal( + walpath: &Path, + timeline: &dyn TimelineWriter, + startpoint: Lsn, + endpoint: Lsn, + checkpoint: &mut CheckPoint, +) -> Result<()> { + let mut waldecoder = WalStreamDecoder::new(startpoint); + + let mut segno = startpoint.segment_number(pg_constants::WAL_SEGMENT_SIZE); + let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE); + let mut last_lsn = startpoint; + + while last_lsn <= endpoint { + // FIXME: assume postgresql tli 1 for now + let filename = XLogFileName(1, segno, pg_constants::WAL_SEGMENT_SIZE); + let mut buf = Vec::new(); + + // Read local file + let mut path = walpath.join(&filename); + + // It could be as .partial + if !PathBuf::from(&path).exists() { + path = walpath.join(filename + ".partial"); + } + + // Slurp the WAL file + let mut file = File::open(&path)?; + + if offset > 0 { + file.seek(SeekFrom::Start(offset as u64))?; + } + + let nread = file.read_to_end(&mut buf)?; + if nread != pg_constants::WAL_SEGMENT_SIZE - offset as usize { + // Maybe allow this for .partial files? + error!("read only {} bytes from WAL file", nread); + } + + waldecoder.feed_bytes(&buf); + + let mut nrecords = 0; + while last_lsn <= endpoint { + if let Some((lsn, recdata)) = waldecoder.poll_decode()? { + let mut checkpoint_modified = false; + + let decoded = decode_wal_record(recdata.clone()); + save_decoded_record( + checkpoint, + &mut checkpoint_modified, + timeline, + &decoded, + recdata, + lsn, + )?; + last_lsn = lsn; + + if checkpoint_modified { + let checkpoint_bytes = checkpoint.encode(); + timeline.put_page_image( + RelishTag::Checkpoint, + 0, + last_lsn, + checkpoint_bytes, + )?; + } + + // Now that this record has been fully handled, including updating the + // checkpoint data, let the repository know that it is up-to-date to this LSN + timeline.advance_last_record_lsn(last_lsn); + nrecords += 1; + + trace!("imported record at {} (end {})", lsn, endpoint); + } + } + + debug!("imported {} records up to {}", nrecords, last_lsn); + + segno += 1; + offset = 0; + } + + if last_lsn != startpoint { + debug!( + "reached end of WAL at {}, updating checkpoint info", + last_lsn + ); + + timeline.advance_last_record_lsn(last_lsn); + } else { + info!("no WAL to import at {}", last_lsn); + } + + Ok(()) +} + /// /// Helper function to parse a WAL record and call the Timeline's PUT functions for all the /// relations/pages that the record affects.