diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 4de6419444..7c992ddb0c 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -1695,7 +1695,7 @@ impl LayeredTimeline { // // If we don't have a base image, then the oldest WAL record better initialize // the page - if data.page_img.is_none() && !data.records.first().unwrap().will_init { + if data.page_img.is_none() && !data.records.first().unwrap().1.will_init { // FIXME: this ought to be an error? warn!( "Base image for page {}/{} at {} not found, but got {} WAL records", @@ -1773,7 +1773,7 @@ impl Deref for LayeredTimelineWriter<'_> { } impl<'a> TimelineWriter for LayeredTimelineWriter<'a> { - fn put_wal_record(&self, rel: RelishTag, blknum: u32, rec: WALRecord) -> Result<()> { + fn put_wal_record(&self, lsn: Lsn, rel: RelishTag, blknum: u32, rec: WALRecord) -> Result<()> { if !rel.is_blocky() && blknum != 0 { bail!( "invalid request for block {} for non-blocky relish {}", @@ -1781,11 +1781,11 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> { rel ); } - ensure!(rec.lsn.is_aligned(), "unaligned record LSN"); + ensure!(lsn.is_aligned(), "unaligned record LSN"); let seg = SegmentTag::from_blknum(rel, blknum); - let layer = self.tl.get_layer_for_write(seg, rec.lsn)?; - let delta_size = layer.put_wal_record(blknum, rec); + let layer = self.tl.get_layer_for_write(seg, lsn)?; + let delta_size = layer.put_wal_record(lsn, blknum, rec); self.tl .increase_current_logical_size(delta_size * BLCKSZ as u32); Ok(()) diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index e93eddb7e6..24ed9d6e69 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -198,7 +198,7 @@ impl Layer for DeltaLayer { .slice_range((Included(&minkey), Included(&maxkey))) .iter() .rev(); - for ((_blknum, _lsn), blob_range) in iter { + for ((_blknum, pv_lsn), blob_range) in iter { let pv = PageVersion::des(&read_blob(&page_version_reader, blob_range)?)?; if let Some(img) = pv.page_image { @@ -208,7 +208,7 @@ impl Layer for DeltaLayer { break; } else if let Some(rec) = pv.record { let will_init = rec.will_init; - reconstruct_data.records.push(rec); + reconstruct_data.records.push((*pv_lsn, rec)); if will_init { // This WAL record initializes the page, so no need to go further back need_image = false; diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index bd1860fd47..474eef09c4 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -160,13 +160,13 @@ impl Layer for InMemoryLayer { .get_block_lsn_range(blknum, ..=lsn) .iter() .rev(); - for (_entry_lsn, entry) in iter { + for (entry_lsn, entry) in iter { if let Some(img) = &entry.page_image { reconstruct_data.page_img = Some(img.clone()); need_image = false; break; } else if let Some(rec) = &entry.record { - reconstruct_data.records.push(rec.clone()); + reconstruct_data.records.push((*entry_lsn, rec.clone())); if rec.will_init { // This WAL record initializes the page, so no need to go further back need_image = false; @@ -337,10 +337,10 @@ impl InMemoryLayer { // Write operations /// Remember new page version, as a WAL record over previous version - pub fn put_wal_record(&self, blknum: u32, rec: WALRecord) -> u32 { + pub fn put_wal_record(&self, lsn: Lsn, blknum: u32, rec: WALRecord) -> u32 { self.put_page_version( blknum, - rec.lsn, + lsn, PageVersion { page_image: None, record: Some(rec), diff --git a/pageserver/src/layered_repository/storage_layer.rs b/pageserver/src/layered_repository/storage_layer.rs index c49fbbdd99..0a86fe407d 100644 --- a/pageserver/src/layered_repository/storage_layer.rs +++ b/pageserver/src/layered_repository/storage_layer.rs @@ -78,7 +78,7 @@ pub struct PageVersion { /// 'records' contains the records to apply over the base image. /// pub struct PageReconstructData { - pub records: Vec, + pub records: Vec<(Lsn, WALRecord)>, pub page_img: Option, } diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index f033c74c31..56e551a275 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -162,7 +162,7 @@ pub trait TimelineWriter: Deref { /// /// This will implicitly extend the relation, if the page is beyond the /// current end-of-file. - fn put_wal_record(&self, tag: RelishTag, blknum: u32, rec: WALRecord) -> Result<()>; + fn put_wal_record(&self, lsn: Lsn, tag: RelishTag, blknum: u32, rec: WALRecord) -> Result<()>; /// Like put_wal_record, but with ready-made image of the page. fn put_page_image(&self, tag: RelishTag, blknum: u32, lsn: Lsn, img: Bytes) -> Result<()>; @@ -182,7 +182,6 @@ pub trait TimelineWriter: Deref { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WALRecord { - pub lsn: Lsn, // LSN at the *end* of the record pub will_init: bool, pub rec: Bytes, // Remember the offset of main_data in rec, @@ -193,22 +192,19 @@ pub struct WALRecord { impl WALRecord { pub fn pack(&self, buf: &mut BytesMut) { - buf.put_u64(self.lsn.0); buf.put_u8(self.will_init as u8); buf.put_u32(self.main_data_offset); buf.put_u32(self.rec.len() as u32); buf.put_slice(&self.rec[..]); } pub fn unpack(buf: &mut Bytes) -> WALRecord { - let lsn = Lsn::from(buf.get_u64()); let will_init = buf.get_u8() != 0; let main_data_offset = buf.get_u32(); - let mut dst = vec![0u8; buf.get_u32() as usize]; - buf.copy_to_slice(&mut dst); + let rec_len = buf.get_u32() as usize; + let rec = buf.split_to(rec_len); WALRecord { - lsn, will_init, - rec: Bytes::from(dst), + rec, main_data_offset, } } @@ -832,7 +828,7 @@ mod tests { blknum: u32, lsn: Lsn, base_img: Option, - records: Vec, + records: Vec<(Lsn, WALRecord)>, ) -> Result { let s = format!( "redo for {} blk {} to get to {}, with {} and {} records", diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index bf4fb12d0a..60eb9ce278 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -312,13 +312,12 @@ pub fn save_decoded_record( }); let rec = WALRecord { - lsn, will_init: blk.will_init || blk.apply_image, rec: recdata.clone(), main_data_offset: decoded.main_data_offset as u32, }; - timeline.put_wal_record(tag, blk.blkno, rec)?; + timeline.put_wal_record(lsn, tag, blk.blkno, rec)?; } let mut buf = decoded.record.clone(); @@ -656,12 +655,12 @@ fn save_xact_record( let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; let rec = WALRecord { - lsn, will_init: false, rec: decoded.record.clone(), main_data_offset: decoded.main_data_offset as u32, }; timeline.put_wal_record( + lsn, RelishTag::Slru { slru: SlruKind::Clog, segno, @@ -677,6 +676,7 @@ fn save_xact_record( let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; timeline.put_wal_record( + lsn, RelishTag::Slru { slru: SlruKind::Clog, segno, @@ -771,7 +771,6 @@ fn save_multixact_create_record( decoded: &DecodedWALRecord, ) -> Result<()> { let rec = WALRecord { - lsn, will_init: false, rec: decoded.record.clone(), main_data_offset: decoded.main_data_offset as u32, @@ -780,6 +779,7 @@ fn save_multixact_create_record( let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; timeline.put_wal_record( + lsn, RelishTag::Slru { slru: SlruKind::MultiXactOffsets, segno, @@ -799,6 +799,7 @@ fn save_multixact_create_record( let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; timeline.put_wal_record( + lsn, RelishTag::Slru { slru: SlruKind::MultiXactMembers, segno, diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index e9382c4da5..8cd696e8f3 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -82,7 +82,7 @@ pub trait WalRedoManager: Send + Sync { blknum: u32, lsn: Lsn, base_img: Option, - records: Vec, + records: Vec<(Lsn, WALRecord)>, ) -> Result; } @@ -99,7 +99,7 @@ impl crate::walredo::WalRedoManager for DummyRedoManager { _blknum: u32, _lsn: Lsn, _base_img: Option, - _records: Vec, + _records: Vec<(Lsn, WALRecord)>, ) -> Result { Err(WalRedoError::InvalidState) } @@ -150,7 +150,7 @@ struct WalRedoRequest { lsn: Lsn, base_img: Option, - records: Vec, + records: Vec<(Lsn, WALRecord)>, } /// An error happened in WAL redo @@ -179,7 +179,7 @@ impl WalRedoManager for PostgresRedoManager { blknum: u32, lsn: Lsn, base_img: Option, - records: Vec, + records: Vec<(Lsn, WALRecord)>, ) -> Result { let start_time; let lock_time; @@ -277,7 +277,7 @@ impl PostgresRedoManager { page.extend_from_slice(&ZERO_PAGE); } // Apply all collected WAL records - for record in records { + for (_lsn, record) in records { let mut buf = record.rec.clone(); WAL_REDO_RECORD_COUNTER.inc(); @@ -544,7 +544,7 @@ impl PostgresRedoProcess { &mut self, tag: BufferTag, base_img: Option, - records: &[WALRecord], + records: &[(Lsn, WALRecord)], ) -> Result { let stdout = &mut self.stdout; // Buffer the writes to avoid a lot of small syscalls. @@ -570,11 +570,11 @@ impl PostgresRedoProcess { } // Send WAL records. - for rec in records.iter() { + for (lsn, rec) in records.iter() { WAL_REDO_RECORD_COUNTER.inc(); stdin - .write_all(&build_apply_record_msg(rec.lsn, &rec.rec)) + .write_all(&build_apply_record_msg(*lsn, &rec.rec)) .await?; //debug!("sent WAL record to wal redo postgres process ({:X}/{:X}",