From 5133db44e191e71b99020f38e2bfe14966daff89 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Fri, 5 Aug 2022 16:28:59 +0300 Subject: [PATCH] Move relation size cache from WalIngest to DatadirTimeline (#2094) * Move relation sie cache to layered timeline * Fix obtaining current LSN for relation size cache * Resolve merge conflicts * Resolve merge conflicts * Reestore 'lsn' field in DatadirModification * adjust DatadirModification lsn in ingest_record * Fix formatting * Pass lsn to get_relsize * Fix merge conflict * Update pageserver/src/pgdatadir_mapping.rs Co-authored-by: Heikki Linnakangas * Update pageserver/src/pgdatadir_mapping.rs Co-authored-by: Heikki Linnakangas Co-authored-by: Heikki Linnakangas --- pageserver/src/import_datadir.rs | 16 +-- pageserver/src/layered_repository/timeline.rs | 47 ++++++- pageserver/src/pgdatadir_mapping.rs | 68 +++++++-- pageserver/src/walingest.rs | 133 ++++++++---------- .../src/walreceiver/walreceiver_connection.rs | 2 +- 5 files changed, 164 insertions(+), 102 deletions(-) diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index ccfd83400a..7d1e8e43aa 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -37,7 +37,7 @@ pub fn import_timeline_from_postgres_datadir( // TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn) // Then fishing out pg_control would be unnecessary - let mut modification = tline.begin_modification(); + let mut modification = tline.begin_modification(lsn); modification.init_empty()?; // Import all but pg_wal @@ -56,12 +56,12 @@ pub fn import_timeline_from_postgres_datadir( if let Some(control_file) = import_file(&mut modification, relative_path, file, len)? { pg_control = Some(control_file); } - modification.flush(lsn)?; + modification.flush()?; } } // We're done importing all the data files. - modification.commit(lsn)?; + modification.commit()?; // We expect the Postgres server to be shut down cleanly. let pg_control = pg_control.context("pg_control file not found")?; @@ -267,7 +267,7 @@ fn import_wal( waldecoder.feed_bytes(&buf); let mut nrecords = 0; - let mut modification = tline.begin_modification(); + let mut modification = tline.begin_modification(endpoint); let mut decoded = DecodedWALRecord::default(); while last_lsn <= endpoint { if let Some((lsn, recdata)) = waldecoder.poll_decode()? { @@ -301,7 +301,7 @@ pub fn import_basebackup_from_tar( base_lsn: Lsn, ) -> Result<()> { info!("importing base at {}", base_lsn); - let mut modification = tline.begin_modification(); + let mut modification = tline.begin_modification(base_lsn); modification.init_empty()?; let mut pg_control: Option = None; @@ -319,7 +319,7 @@ pub fn import_basebackup_from_tar( // We found the pg_control file. pg_control = Some(res); } - modification.flush(base_lsn)?; + modification.flush()?; } tar::EntryType::Directory => { debug!("directory {:?}", file_path); @@ -333,7 +333,7 @@ pub fn import_basebackup_from_tar( // sanity check: ensure that pg_control is loaded let _pg_control = pg_control.context("pg_control file not found")?; - modification.commit(base_lsn)?; + modification.commit()?; Ok(()) } @@ -385,7 +385,7 @@ pub fn import_wal_from_tar( waldecoder.feed_bytes(&bytes[offset..]); - let mut modification = tline.begin_modification(); + let mut modification = tline.begin_modification(end_lsn); let mut decoded = DecodedWALRecord::default(); while last_lsn <= end_lsn { if let Some((lsn, recdata)) = waldecoder.poll_decode()? { diff --git a/pageserver/src/layered_repository/timeline.rs b/pageserver/src/layered_repository/timeline.rs index 6ed1efd3d1..095f3d3861 100644 --- a/pageserver/src/layered_repository/timeline.rs +++ b/pageserver/src/layered_repository/timeline.rs @@ -8,7 +8,7 @@ use lazy_static::lazy_static; use tracing::*; use std::cmp::{max, min, Ordering}; -use std::collections::HashSet; +use std::collections::{hash_map::Entry, HashMap, HashSet}; use std::fs; use std::fs::{File, OpenOptions}; use std::io::Write; @@ -38,7 +38,9 @@ use crate::layered_repository::{ use crate::config::PageServerConf; use crate::keyspace::{KeyPartitioning, KeySpace}; +use crate::pgdatadir_mapping::BlockNumber; use crate::pgdatadir_mapping::LsnForTimestamp; +use crate::reltag::RelTag; use crate::tenant_config::TenantConfOpt; use crate::DatadirTimeline; @@ -295,6 +297,9 @@ pub struct LayeredTimeline { /// or None if WAL receiver has not received anything for this timeline /// yet. pub last_received_wal: Mutex>, + + /// Relation size cache + rel_size_cache: RwLock>, } pub struct WalReceiverInfo { @@ -306,7 +311,42 @@ pub struct WalReceiverInfo { /// Inherit all the functions from DatadirTimeline, to provide the /// functionality to store PostgreSQL relations, SLRUs, etc. in a /// LayeredTimeline. -impl DatadirTimeline for LayeredTimeline {} +impl DatadirTimeline for LayeredTimeline { + fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option { + let rel_size_cache = self.rel_size_cache.read().unwrap(); + if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) { + if lsn >= *cached_lsn { + return Some(*nblocks); + } + } + None + } + + fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) { + let mut rel_size_cache = self.rel_size_cache.write().unwrap(); + match rel_size_cache.entry(tag) { + Entry::Occupied(mut entry) => { + let cached_lsn = entry.get_mut(); + if lsn >= cached_lsn.0 { + *cached_lsn = (lsn, nblocks); + } + } + Entry::Vacant(entry) => { + entry.insert((lsn, nblocks)); + } + } + } + + fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) { + let mut rel_size_cache = self.rel_size_cache.write().unwrap(); + rel_size_cache.insert(tag, (lsn, nblocks)); + } + + fn remove_cached_rel_size(&self, tag: &RelTag) { + let mut rel_size_cache = self.rel_size_cache.write().unwrap(); + rel_size_cache.remove(tag); + } +} /// /// Information about how much history needs to be retained, needed by @@ -377,8 +417,6 @@ impl Timeline for LayeredTimeline { /// Look up the value with the given a key fn get(&self, key: Key, lsn: Lsn) -> Result { - debug_assert!(lsn <= self.get_last_record_lsn()); - // Check the page cache. We will get back the most recent page with lsn <= `lsn`. // The cached image can be returned directly if there is no WAL between the cached image // and requested LSN. The cached image can also be used to reduce the amount of WAL needed @@ -618,6 +656,7 @@ impl LayeredTimeline { repartition_threshold: 0, last_received_wal: Mutex::new(None), + rel_size_cache: RwLock::new(HashMap::new()), }; result.repartition_threshold = result.get_checkpoint_distance() / 10; result diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 61aca8d4ba..9097a08d05 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -56,13 +56,16 @@ pub trait DatadirTimeline: Timeline { /// This provides a transaction-like interface to perform a bunch /// of modifications atomically. /// - /// To ingest a WAL record, call begin_modification() to get a + /// To ingest a WAL record, call begin_modification(lsn) to get a /// DatadirModification object. Use the functions in the object to /// modify the repository state, updating all the pages and metadata - /// that the WAL record affects. When you're done, call commit(lsn) to - /// commit the changes. All the changes will be stamped with the specified LSN. + /// that the WAL record affects. When you're done, call commit() to + /// commit the changes. /// - /// Calling commit(lsn) will flush all the changes and reset the state, + /// Lsn stored in modification is advanced by `ingest_record` and + /// is used by `commit()` to update `last_record_lsn`. + /// + /// Calling commit() will flush all the changes and reset the state, /// so the `DatadirModification` struct can be reused to perform the next modification. /// /// Note that any pending modifications you make through the @@ -70,7 +73,7 @@ pub trait DatadirTimeline: Timeline { /// functions of the timeline until you finish! And if you update the /// same page twice, the last update wins. /// - fn begin_modification(&self) -> DatadirModification + fn begin_modification(&self, lsn: Lsn) -> DatadirModification where Self: Sized, { @@ -79,6 +82,7 @@ pub trait DatadirTimeline: Timeline { pending_updates: HashMap::new(), pending_deletions: Vec::new(), pending_nblocks: 0, + lsn, } } @@ -120,6 +124,10 @@ pub trait DatadirTimeline: Timeline { fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result { ensure!(tag.relnode != 0, "invalid relnode"); + if let Some(nblocks) = self.get_cached_rel_size(&tag, lsn) { + return Ok(nblocks); + } + if (tag.forknum == pg_constants::FSM_FORKNUM || tag.forknum == pg_constants::VISIBILITYMAP_FORKNUM) && !self.get_rel_exists(tag, lsn)? @@ -133,13 +141,21 @@ pub trait DatadirTimeline: Timeline { let key = rel_size_to_key(tag); let mut buf = self.get(key, lsn)?; - Ok(buf.get_u32_le()) + let nblocks = buf.get_u32_le(); + + // Update relation size cache + self.update_cached_rel_size(tag, lsn, nblocks); + Ok(nblocks) } /// Does relation exist? fn get_rel_exists(&self, tag: RelTag, lsn: Lsn) -> Result { ensure!(tag.relnode != 0, "invalid relnode"); + // first try to lookup relation in cache + if let Some(_nblocks) = self.get_cached_rel_size(&tag, lsn) { + return Ok(true); + } // fetch directory listing let key = rel_dir_to_key(tag.spcnode, tag.dbnode); let buf = self.get(key, lsn)?; @@ -445,6 +461,18 @@ pub trait DatadirTimeline: Timeline { Ok(result.to_keyspace()) } + + /// Get cached size of relation if it not updated after specified LSN + fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option; + + /// Update cached relation size if there is no more recent update + fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber); + + /// Store cached relation size + fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber); + + /// Remove cached relation size + fn remove_cached_rel_size(&self, tag: &RelTag); } /// DatadirModification represents an operation to ingest an atomic set of @@ -457,6 +485,9 @@ pub struct DatadirModification<'a, T: DatadirTimeline> { /// in the state in 'tline' yet. pub tline: &'a T, + /// Lsn assigned by begin_modification + pub lsn: Lsn, + // The modifications are not applied directly to the underlying key-value store. // The put-functions add the modifications here, and they are flushed to the // underlying key-value store by the 'finish' function. @@ -666,9 +697,11 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> { self.pending_nblocks += nblocks as isize; + // Update relation size cache + self.tline.set_cached_rel_size(rel, self.lsn, nblocks); + // Even if nblocks > 0, we don't insert any actual blocks here. That's up to the // caller. - Ok(()) } @@ -684,6 +717,9 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> { let buf = nblocks.to_le_bytes(); self.put(size_key, Value::Image(Bytes::from(buf.to_vec()))); + // Update relation size cache + self.tline.set_cached_rel_size(rel, self.lsn, nblocks); + // Update logical database size. self.pending_nblocks -= old_size as isize - nblocks as isize; Ok(()) @@ -703,6 +739,9 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> { let buf = nblocks.to_le_bytes(); self.put(size_key, Value::Image(Bytes::from(buf.to_vec()))); + // Update relation size cache + self.tline.set_cached_rel_size(rel, self.lsn, nblocks); + self.pending_nblocks += nblocks as isize - old_size as isize; } Ok(()) @@ -728,6 +767,9 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> { let old_size = self.get(size_key)?.get_u32_le(); self.pending_nblocks -= old_size as isize; + // Remove enty from relation size cache + self.tline.remove_cached_rel_size(&rel); + // Delete size entry, as well as all blocks self.delete(rel_key_range(rel)); @@ -842,7 +884,7 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> { /// retains all the metadata, but data pages are flushed. That's again OK /// for bulk import, where you are just loading data pages and won't try to /// modify the same pages twice. - pub fn flush(&mut self, lsn: Lsn) -> Result<()> { + pub fn flush(&mut self) -> Result<()> { // Unless we have accumulated a decent amount of changes, it's not worth it // to scan through the pending_updates list. let pending_nblocks = self.pending_nblocks; @@ -856,7 +898,7 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> { let mut result: Result<()> = Ok(()); self.pending_updates.retain(|&key, value| { if result.is_ok() && (is_rel_block_key(key) || is_slru_block_key(key)) { - result = writer.put(key, lsn, value); + result = writer.put(key, self.lsn, value); false } else { true @@ -877,9 +919,9 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> { /// underlying timeline. /// All the modifications in this atomic update are stamped by the specified LSN. /// - pub fn commit(&mut self, lsn: Lsn) -> Result<()> { + pub fn commit(&mut self) -> Result<()> { let writer = self.tline.writer(); - + let lsn = self.lsn; let pending_nblocks = self.pending_nblocks; self.pending_nblocks = 0; @@ -1324,9 +1366,9 @@ pub fn create_test_timeline( timeline_id: utils::zid::ZTimelineId, ) -> Result> { let tline = repo.create_empty_timeline(timeline_id, Lsn(8))?; - let mut m = tline.begin_modification(); + let mut m = tline.begin_modification(Lsn(8)); m.init_empty()?; - m.commit(Lsn(8))?; + m.commit()?; Ok(tline) } diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 8dd14ec177..b8064849e0 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -30,8 +30,6 @@ use anyhow::Result; use bytes::{Buf, Bytes, BytesMut}; use tracing::*; -use std::collections::HashMap; - use crate::pgdatadir_mapping::*; use crate::reltag::{RelTag, SlruKind}; use crate::walrecord::*; @@ -48,8 +46,6 @@ pub struct WalIngest<'a, T: DatadirTimeline> { checkpoint: CheckPoint, checkpoint_modified: bool, - - relsize_cache: HashMap, } impl<'a, T: DatadirTimeline> WalIngest<'a, T> { @@ -64,13 +60,13 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { timeline, checkpoint, checkpoint_modified: false, - relsize_cache: HashMap::new(), }) } /// /// Decode a PostgreSQL WAL record and store it in the repository, in the given timeline. /// + /// This function updates `lsn` field of `DatadirModification` /// /// Helper function to parse a WAL record and call the Timeline's PUT functions for all the /// relations/pages that the record affects. @@ -82,6 +78,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { modification: &mut DatadirModification, decoded: &mut DecodedWALRecord, ) -> Result<()> { + modification.lsn = lsn; decode_wal_record(recdata, decoded).context("failed decoding wal record")?; let mut buf = decoded.record.clone(); @@ -260,7 +257,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { // Now that this record has been fully handled, including updating the // checkpoint data, let the repository know that it is up-to-date to this LSN - modification.commit(lsn)?; + modification.commit()?; Ok(()) } @@ -408,7 +405,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { // replaying it would fail to find the previous image of the page, because // it doesn't exist. So check if the VM page(s) exist, and skip the WAL // record if it doesn't. - let vm_size = self.get_relsize(vm_rel)?; + let vm_size = self.get_relsize(vm_rel, modification.lsn)?; if let Some(blknum) = new_vm_blk { if blknum >= vm_size { new_vm_blk = None; @@ -880,7 +877,6 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { modification: &mut DatadirModification, rel: RelTag, ) -> Result<()> { - self.relsize_cache.insert(rel, 0); modification.put_rel_creation(rel, 0)?; Ok(()) } @@ -916,7 +912,6 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { nblocks: BlockNumber, ) -> Result<()> { modification.put_rel_truncation(rel, nblocks)?; - self.relsize_cache.insert(rel, nblocks); Ok(()) } @@ -926,23 +921,16 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { rel: RelTag, ) -> Result<()> { modification.put_rel_drop(rel)?; - self.relsize_cache.remove(&rel); Ok(()) } - fn get_relsize(&mut self, rel: RelTag) -> Result { - if let Some(nblocks) = self.relsize_cache.get(&rel) { - Ok(*nblocks) + fn get_relsize(&mut self, rel: RelTag, lsn: Lsn) -> Result { + let nblocks = if !self.timeline.get_rel_exists(rel, lsn)? { + 0 } else { - let last_lsn = self.timeline.get_last_record_lsn(); - let nblocks = if !self.timeline.get_rel_exists(rel, last_lsn)? { - 0 - } else { - self.timeline.get_rel_size(rel, last_lsn)? - }; - self.relsize_cache.insert(rel, nblocks); - Ok(nblocks) - } + self.timeline.get_rel_size(rel, lsn)? + }; + Ok(nblocks) } fn handle_rel_extend( @@ -952,22 +940,16 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { blknum: BlockNumber, ) -> Result<()> { let new_nblocks = blknum + 1; - let old_nblocks = if let Some(nblocks) = self.relsize_cache.get(&rel) { - *nblocks + // Check if the relation exists. We implicitly create relations on first + // record. + // TODO: would be nice if to be more explicit about it + let last_lsn = modification.lsn; + let old_nblocks = if !self.timeline.get_rel_exists(rel, last_lsn)? { + // create it with 0 size initially, the logic below will extend it + modification.put_rel_creation(rel, 0)?; + 0 } else { - // Check if the relation exists. We implicitly create relations on first - // record. - // TODO: would be nice if to be more explicit about it - let last_lsn = self.timeline.get_last_record_lsn(); - let nblocks = if !self.timeline.get_rel_exists(rel, last_lsn)? { - // create it with 0 size initially, the logic below will extend it - modification.put_rel_creation(rel, 0)?; - 0 - } else { - self.timeline.get_rel_size(rel, last_lsn)? - }; - self.relsize_cache.insert(rel, nblocks); - nblocks + self.timeline.get_rel_size(rel, last_lsn)? }; if new_nblocks > old_nblocks { @@ -978,7 +960,6 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { for gap_blknum in old_nblocks..blknum { modification.put_rel_page_image(rel, gap_blknum, ZERO_PAGE.clone())?; } - self.relsize_cache.insert(rel, new_nblocks); } Ok(()) } @@ -1069,10 +1050,10 @@ mod tests { static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]); fn init_walingest_test(tline: &T) -> Result> { - let mut m = tline.begin_modification(); + let mut m = tline.begin_modification(Lsn(0x10)); m.put_checkpoint(ZERO_CHECKPOINT.clone())?; m.put_relmap_file(0, 111, Bytes::from(""))?; // dummy relmapper file - m.commit(Lsn(0x10))?; + m.commit()?; let walingest = WalIngest::new(tline, Lsn(0x10))?; Ok(walingest) @@ -1084,19 +1065,19 @@ mod tests { let tline = create_test_timeline(repo, TIMELINE_ID)?; let mut walingest = init_walingest_test(&*tline)?; - let mut m = tline.begin_modification(); + let mut m = tline.begin_modification(Lsn(0x20)); walingest.put_rel_creation(&mut m, TESTREL_A)?; walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?; - m.commit(Lsn(0x20))?; - let mut m = tline.begin_modification(); + m.commit()?; + let mut m = tline.begin_modification(Lsn(0x30)); walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"))?; - m.commit(Lsn(0x30))?; - let mut m = tline.begin_modification(); + m.commit()?; + let mut m = tline.begin_modification(Lsn(0x40)); walingest.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"))?; - m.commit(Lsn(0x40))?; - let mut m = tline.begin_modification(); + m.commit()?; + let mut m = tline.begin_modification(Lsn(0x50)); walingest.put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"))?; - m.commit(Lsn(0x50))?; + m.commit()?; assert_current_logical_size(&*tline, Lsn(0x50)); @@ -1142,9 +1123,9 @@ mod tests { ); // Truncate last block - let mut m = tline.begin_modification(); + let mut m = tline.begin_modification(Lsn(0x60)); walingest.put_rel_truncation(&mut m, TESTREL_A, 2)?; - m.commit(Lsn(0x60))?; + m.commit()?; assert_current_logical_size(&*tline, Lsn(0x60)); // Check reported size and contents after truncation @@ -1166,15 +1147,15 @@ mod tests { ); // Truncate to zero length - let mut m = tline.begin_modification(); + let mut m = tline.begin_modification(Lsn(0x68)); walingest.put_rel_truncation(&mut m, TESTREL_A, 0)?; - m.commit(Lsn(0x68))?; + m.commit()?; assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x68))?, 0); // Extend from 0 to 2 blocks, leaving a gap - let mut m = tline.begin_modification(); + let mut m = tline.begin_modification(Lsn(0x70)); walingest.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"))?; - m.commit(Lsn(0x70))?; + m.commit()?; assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x70))?, 2); assert_eq!( tline.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x70))?, @@ -1186,9 +1167,9 @@ mod tests { ); // Extend a lot more, leaving a big gap that spans across segments - let mut m = tline.begin_modification(); + let mut m = tline.begin_modification(Lsn(0x80)); walingest.put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"))?; - m.commit(Lsn(0x80))?; + m.commit()?; assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x80))?, 1501); for blk in 2..1500 { assert_eq!( @@ -1212,18 +1193,18 @@ mod tests { let tline = create_test_timeline(repo, TIMELINE_ID)?; let mut walingest = init_walingest_test(&*tline)?; - let mut m = tline.begin_modification(); + let mut m = tline.begin_modification(Lsn(0x20)); walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?; - m.commit(Lsn(0x20))?; + m.commit()?; // Check that rel exists and size is correct assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x20))?, true); assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x20))?, 1); // Drop rel - let mut m = tline.begin_modification(); + let mut m = tline.begin_modification(Lsn(0x30)); walingest.put_rel_drop(&mut m, TESTREL_A)?; - m.commit(Lsn(0x30))?; + m.commit()?; // Check that rel is not visible anymore assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x30))?, false); @@ -1232,9 +1213,9 @@ mod tests { //assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30))?.is_none()); // Re-create it - let mut m = tline.begin_modification(); + let mut m = tline.begin_modification(Lsn(0x40)); walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"))?; - m.commit(Lsn(0x40))?; + m.commit()?; // Check that rel exists and size is correct assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x40))?, true); @@ -1254,12 +1235,12 @@ mod tests { // Create a 20 MB relation (the size is arbitrary) let relsize = 20 * 1024 * 1024 / 8192; - let mut m = tline.begin_modification(); + let mut m = tline.begin_modification(Lsn(0x20)); for blkno in 0..relsize { let data = format!("foo blk {} at {}", blkno, Lsn(0x20)); walingest.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data))?; } - m.commit(Lsn(0x20))?; + m.commit()?; // The relation was created at LSN 20, not visible at LSN 1 yet. assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x10))?, false); @@ -1280,9 +1261,9 @@ mod tests { // Truncate relation so that second segment was dropped // - only leave one page - let mut m = tline.begin_modification(); + let mut m = tline.begin_modification(Lsn(0x60)); walingest.put_rel_truncation(&mut m, TESTREL_A, 1)?; - m.commit(Lsn(0x60))?; + m.commit()?; // Check reported size and contents after truncation assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x60))?, 1); @@ -1310,12 +1291,12 @@ mod tests { // Extend relation again. // Add enough blocks to create second segment let lsn = Lsn(0x80); - let mut m = tline.begin_modification(); + let mut m = tline.begin_modification(lsn); for blkno in 0..relsize { let data = format!("foo blk {} at {}", blkno, lsn); walingest.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data))?; } - m.commit(lsn)?; + m.commit()?; assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x80))?, true); assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x80))?, relsize); @@ -1343,10 +1324,10 @@ mod tests { let mut lsn = 0x10; for blknum in 0..pg_constants::RELSEG_SIZE + 1 { lsn += 0x10; - let mut m = tline.begin_modification(); + let mut m = tline.begin_modification(Lsn(lsn)); let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn))); walingest.put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img)?; - m.commit(Lsn(lsn))?; + m.commit()?; } assert_current_logical_size(&*tline, Lsn(lsn)); @@ -1358,9 +1339,9 @@ mod tests { // Truncate one block lsn += 0x10; - let mut m = tline.begin_modification(); + let mut m = tline.begin_modification(Lsn(lsn)); walingest.put_rel_truncation(&mut m, TESTREL_A, pg_constants::RELSEG_SIZE)?; - m.commit(Lsn(lsn))?; + m.commit()?; assert_eq!( tline.get_rel_size(TESTREL_A, Lsn(lsn))?, pg_constants::RELSEG_SIZE @@ -1369,9 +1350,9 @@ mod tests { // Truncate another block lsn += 0x10; - let mut m = tline.begin_modification(); + let mut m = tline.begin_modification(Lsn(lsn)); walingest.put_rel_truncation(&mut m, TESTREL_A, pg_constants::RELSEG_SIZE - 1)?; - m.commit(Lsn(lsn))?; + m.commit()?; assert_eq!( tline.get_rel_size(TESTREL_A, Lsn(lsn))?, pg_constants::RELSEG_SIZE - 1 @@ -1383,9 +1364,9 @@ mod tests { let mut size: i32 = 3000; while size >= 0 { lsn += 0x10; - let mut m = tline.begin_modification(); + let mut m = tline.begin_modification(Lsn(lsn)); walingest.put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber)?; - m.commit(Lsn(lsn))?; + m.commit()?; assert_eq!( tline.get_rel_size(TESTREL_A, Lsn(lsn))?, size as BlockNumber diff --git a/pageserver/src/walreceiver/walreceiver_connection.rs b/pageserver/src/walreceiver/walreceiver_connection.rs index fbd9ccd3c5..c4e66bdb95 100644 --- a/pageserver/src/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/walreceiver/walreceiver_connection.rs @@ -154,7 +154,7 @@ pub async fn handle_walreceiver_connection( { let mut decoded = DecodedWALRecord::default(); - let mut modification = timeline.begin_modification(); + let mut modification = timeline.begin_modification(endlsn); while let Some((lsn, recdata)) = waldecoder.poll_decode()? { // let _enter = info_span!("processing record", lsn = %lsn).entered();