diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index ed452eae7d..7e5ae892ad 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -23,6 +23,7 @@ use tracing::*; use tokio_tar::{Builder, EntryType, Header}; use crate::context::RequestContext; +use crate::pgdatadir_mapping::Version; use crate::tenant::Timeline; use pageserver_api::reltag::{RelTag, SlruKind}; @@ -174,7 +175,7 @@ where ] { for segno in self .timeline - .list_slru_segments(kind, self.lsn, self.ctx) + .list_slru_segments(kind, Version::Lsn(self.lsn), self.ctx) .await? { self.add_slru_segment(kind, segno).await?; @@ -192,7 +193,7 @@ where // Otherwise only include init forks of unlogged relations. let rels = self .timeline - .list_rels(spcnode, dbnode, self.lsn, self.ctx) + .list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx) .await?; for &rel in rels.iter() { // Send init fork as main fork to provide well formed empty @@ -267,7 +268,7 @@ where async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> anyhow::Result<()> { let nblocks = self .timeline - .get_rel_size(src, self.lsn, false, self.ctx) + .get_rel_size(src, Version::Lsn(self.lsn), false, self.ctx) .await?; // If the relation is empty, create an empty file @@ -288,7 +289,7 @@ where for blknum in startblk..endblk { let img = self .timeline - .get_rel_page_at_lsn(src, blknum, self.lsn, false, self.ctx) + .get_rel_page_at_lsn(src, blknum, Version::Lsn(self.lsn), false, self.ctx) .await?; segment_data.extend_from_slice(&img[..]); } @@ -310,7 +311,7 @@ where async fn add_slru_segment(&mut self, slru: SlruKind, segno: u32) -> anyhow::Result<()> { let nblocks = self .timeline - .get_slru_segment_size(slru, segno, self.lsn, self.ctx) + .get_slru_segment_size(slru, segno, Version::Lsn(self.lsn), self.ctx) .await?; let mut slru_buf: Vec = Vec::with_capacity(nblocks as usize * BLCKSZ as usize); @@ -352,7 +353,7 @@ where let relmap_img = if has_relmap_file { let img = self .timeline - .get_relmap_file(spcnode, dbnode, self.lsn, self.ctx) + .get_relmap_file(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx) .await?; ensure!( @@ -399,7 +400,7 @@ where if !has_relmap_file && self .timeline - .list_rels(spcnode, dbnode, self.lsn, self.ctx) + .list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx) .await? .is_empty() { diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 8516f397ca..4560f5eca0 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -76,6 +76,8 @@ pub mod defaults { pub const DEFAULT_HEATMAP_UPLOAD_CONCURRENCY: usize = 8; + pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100; + /// /// Default built-in configuration file. /// @@ -88,6 +90,7 @@ pub mod defaults { #wait_lsn_timeout = '{DEFAULT_WAIT_LSN_TIMEOUT}' #wal_redo_timeout = '{DEFAULT_WAL_REDO_TIMEOUT}' +#page_cache_size = {DEFAULT_PAGE_CACHE_SIZE} #max_file_descriptors = {DEFAULT_MAX_FILE_DESCRIPTORS} # initial superuser role name to use when creating a new tenant @@ -108,6 +111,8 @@ pub mod defaults { #background_task_maximum_delay = '{DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY}' +#ingest_batch_size = {DEFAULT_INGEST_BATCH_SIZE} + [tenant_config] #checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes #checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT} @@ -233,6 +238,9 @@ pub struct PageServerConf { /// How many heatmap uploads may be done concurrency: lower values implicitly deprioritize /// heatmap uploads vs. other remote storage operations. pub heatmap_upload_concurrency: usize, + + /// Maximum number of WAL records to be ingested and committed at the same time + pub ingest_batch_size: u64, } /// We do not want to store this in a PageServerConf because the latter may be logged @@ -314,6 +322,8 @@ struct PageServerConfigBuilder { control_plane_emergency_mode: BuilderValue, heatmap_upload_concurrency: BuilderValue, + + ingest_batch_size: BuilderValue, } impl Default for PageServerConfigBuilder { @@ -386,6 +396,8 @@ impl Default for PageServerConfigBuilder { control_plane_emergency_mode: Set(false), heatmap_upload_concurrency: Set(DEFAULT_HEATMAP_UPLOAD_CONCURRENCY), + + ingest_batch_size: Set(DEFAULT_INGEST_BATCH_SIZE), } } } @@ -534,6 +546,10 @@ impl PageServerConfigBuilder { self.heatmap_upload_concurrency = BuilderValue::Set(value) } + pub fn ingest_batch_size(&mut self, ingest_batch_size: u64) { + self.ingest_batch_size = BuilderValue::Set(ingest_batch_size) + } + pub fn build(self) -> anyhow::Result { let concurrent_tenant_warmup = self .concurrent_tenant_warmup @@ -632,10 +648,12 @@ impl PageServerConfigBuilder { control_plane_emergency_mode: self .control_plane_emergency_mode .ok_or(anyhow!("missing control_plane_emergency_mode"))?, - heatmap_upload_concurrency: self .heatmap_upload_concurrency .ok_or(anyhow!("missing heatmap_upload_concurrency"))?, + ingest_batch_size: self + .ingest_batch_size + .ok_or(anyhow!("missing ingest_batch_size"))?, }) } } @@ -878,6 +896,7 @@ impl PageServerConf { "heatmap_upload_concurrency" => { builder.heatmap_upload_concurrency(parse_toml_u64(key, item)? as usize) }, + "ingest_batch_size" => builder.ingest_batch_size(parse_toml_u64(key, item)?), _ => bail!("unrecognized pageserver option '{key}'"), } } @@ -949,6 +968,7 @@ impl PageServerConf { control_plane_api_token: None, control_plane_emergency_mode: false, heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY, + ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE, } } } @@ -1177,7 +1197,8 @@ background_task_maximum_delay = '334 s' control_plane_api: None, control_plane_api_token: None, control_plane_emergency_mode: false, - heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY + heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY, + ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE, }, "Correct defaults should be used when no config values are provided" ); @@ -1238,7 +1259,8 @@ background_task_maximum_delay = '334 s' control_plane_api: None, control_plane_api_token: None, control_plane_emergency_mode: false, - heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY + heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY, + ingest_batch_size: 100, }, "Should be able to parse all basic config values correctly" ); diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index d95d75449d..d66df36b3a 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -21,6 +21,7 @@ use tracing::*; use walkdir::WalkDir; use crate::context::RequestContext; +use crate::metrics::WAL_INGEST; use crate::pgdatadir_mapping::*; use crate::tenant::remote_timeline_client::INITDB_PATH; use crate::tenant::Timeline; @@ -312,13 +313,16 @@ async fn import_wal( waldecoder.feed_bytes(&buf); let mut nrecords = 0; - let mut modification = tline.begin_modification(endpoint); + let mut modification = tline.begin_modification(last_lsn); let mut decoded = DecodedWALRecord::default(); while last_lsn <= endpoint { if let Some((lsn, recdata)) = waldecoder.poll_decode()? { walingest .ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx) .await?; + WAL_INGEST.records_committed.inc(); + + modification.commit(ctx).await?; last_lsn = lsn; nrecords += 1; @@ -448,13 +452,14 @@ pub async fn import_wal_from_tar( waldecoder.feed_bytes(&bytes[offset..]); - let mut modification = tline.begin_modification(end_lsn); + let mut modification = tline.begin_modification(last_lsn); let mut decoded = DecodedWALRecord::default(); while last_lsn <= end_lsn { if let Some((lsn, recdata)) = waldecoder.poll_decode()? { walingest .ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx) .await?; + modification.commit(ctx).await?; last_lsn = lsn; debug!("imported record at {} (end {})", lsn, end_lsn); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index d5ca7f7382..db07a600e5 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -53,7 +53,7 @@ use crate::context::{DownloadBehavior, RequestContext}; use crate::import_datadir::import_wal_from_tar; use crate::metrics; use crate::metrics::LIVE_CONNECTIONS_COUNT; -use crate::pgdatadir_mapping::rel_block_to_key; +use crate::pgdatadir_mapping::{rel_block_to_key, Version}; use crate::task_mgr; use crate::task_mgr::TaskKind; use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id; @@ -747,7 +747,7 @@ impl PageServerHandler { .await?; let exists = timeline - .get_rel_exists(req.rel, lsn, req.latest, ctx) + .get_rel_exists(req.rel, Version::Lsn(lsn), req.latest, ctx) .await?; Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse { @@ -766,7 +766,9 @@ impl PageServerHandler { Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx) .await?; - let n_blocks = timeline.get_rel_size(req.rel, lsn, req.latest, ctx).await?; + let n_blocks = timeline + .get_rel_size(req.rel, Version::Lsn(lsn), req.latest, ctx) + .await?; Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse { n_blocks, @@ -785,7 +787,13 @@ impl PageServerHandler { .await?; let total_blocks = timeline - .get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, lsn, req.latest, ctx) + .get_db_size( + DEFAULTTABLESPACE_OID, + req.dbnode, + Version::Lsn(lsn), + req.latest, + ctx, + ) .await?; let db_size = total_blocks as i64 * BLCKSZ as i64; @@ -816,7 +824,7 @@ impl PageServerHandler { let key = rel_block_to_key(req.rel, req.blkno); let page = if timeline.get_shard_identity().is_key_local(&key) { timeline - .get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest, ctx) + .get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.latest, ctx) .await? } else { // The Tenant shard we looked up at connection start does not hold this particular @@ -853,7 +861,7 @@ impl PageServerHandler { // the GateGuard was already held over the whole connection. let _timeline_guard = timeline.gate.enter().map_err(|_| QueryError::Shutdown)?; timeline - .get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest, ctx) + .get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.latest, ctx) .await? }; diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index e9884a15f5..9fe75e5baf 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -11,7 +11,7 @@ use crate::context::RequestContext; use crate::keyspace::{KeySpace, KeySpaceAccum}; use crate::repository::*; use crate::walrecord::NeonWalRecord; -use anyhow::Context; +use anyhow::{ensure, Context}; use bytes::{Buf, Bytes}; use pageserver_api::key::is_rel_block_key; use pageserver_api::reltag::{RelTag, SlruKind}; @@ -147,6 +147,7 @@ impl Timeline { { DatadirModification { tline: self, + pending_lsns: Vec::new(), pending_updates: HashMap::new(), pending_deletions: Vec::new(), pending_nblocks: 0, @@ -163,7 +164,7 @@ impl Timeline { &self, tag: RelTag, blknum: BlockNumber, - lsn: Lsn, + version: Version<'_>, latest: bool, ctx: &RequestContext, ) -> Result { @@ -173,17 +174,20 @@ impl Timeline { )); } - let nblocks = self.get_rel_size(tag, lsn, latest, ctx).await?; + let nblocks = self.get_rel_size(tag, version, latest, ctx).await?; if blknum >= nblocks { debug!( "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page", - tag, blknum, lsn, nblocks + tag, + blknum, + version.get_lsn(), + nblocks ); return Ok(ZERO_PAGE.clone()); } let key = rel_block_to_key(tag, blknum); - self.get(key, lsn, ctx).await + version.get(self, key, ctx).await } // Get size of a database in blocks @@ -191,16 +195,16 @@ impl Timeline { &self, spcnode: Oid, dbnode: Oid, - lsn: Lsn, + version: Version<'_>, latest: bool, ctx: &RequestContext, ) -> Result { let mut total_blocks = 0; - let rels = self.list_rels(spcnode, dbnode, lsn, ctx).await?; + let rels = self.list_rels(spcnode, dbnode, version, ctx).await?; for rel in rels { - let n_blocks = self.get_rel_size(rel, lsn, latest, ctx).await?; + let n_blocks = self.get_rel_size(rel, version, latest, ctx).await?; total_blocks += n_blocks as usize; } Ok(total_blocks) @@ -210,7 +214,7 @@ impl Timeline { pub async fn get_rel_size( &self, tag: RelTag, - lsn: Lsn, + version: Version<'_>, latest: bool, ctx: &RequestContext, ) -> Result { @@ -220,12 +224,12 @@ impl Timeline { )); } - if let Some(nblocks) = self.get_cached_rel_size(&tag, lsn) { + if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) { return Ok(nblocks); } if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM) - && !self.get_rel_exists(tag, lsn, latest, ctx).await? + && !self.get_rel_exists(tag, version, latest, ctx).await? { // FIXME: Postgres sometimes calls smgrcreate() to create // FSM, and smgrnblocks() on it immediately afterwards, @@ -235,7 +239,7 @@ impl Timeline { } let key = rel_size_to_key(tag); - let mut buf = self.get(key, lsn, ctx).await?; + let mut buf = version.get(self, key, ctx).await?; let nblocks = buf.get_u32_le(); if latest { @@ -246,7 +250,7 @@ impl Timeline { // latest=true, then it can not cause cache corruption, because with latest=true // pageserver choose max(request_lsn, last_written_lsn) and so cached value will be // associated with most recent value of LSN. - self.update_cached_rel_size(tag, lsn, nblocks); + self.update_cached_rel_size(tag, version.get_lsn(), nblocks); } Ok(nblocks) } @@ -255,7 +259,7 @@ impl Timeline { pub async fn get_rel_exists( &self, tag: RelTag, - lsn: Lsn, + version: Version<'_>, _latest: bool, ctx: &RequestContext, ) -> Result { @@ -266,12 +270,12 @@ impl Timeline { } // first try to lookup relation in cache - if let Some(_nblocks) = self.get_cached_rel_size(&tag, lsn) { + if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) { return Ok(true); } // fetch directory listing let key = rel_dir_to_key(tag.spcnode, tag.dbnode); - let buf = self.get(key, lsn, ctx).await?; + let buf = version.get(self, key, ctx).await?; match RelDirectory::des(&buf).context("deserialization failure") { Ok(dir) => { @@ -291,12 +295,12 @@ impl Timeline { &self, spcnode: Oid, dbnode: Oid, - lsn: Lsn, + version: Version<'_>, ctx: &RequestContext, ) -> Result, PageReconstructError> { // fetch directory listing let key = rel_dir_to_key(spcnode, dbnode); - let buf = self.get(key, lsn, ctx).await?; + let buf = version.get(self, key, ctx).await?; match RelDirectory::des(&buf).context("deserialization failure") { Ok(dir) => { @@ -332,11 +336,11 @@ impl Timeline { &self, kind: SlruKind, segno: u32, - lsn: Lsn, + version: Version<'_>, ctx: &RequestContext, ) -> Result { let key = slru_segment_size_to_key(kind, segno); - let mut buf = self.get(key, lsn, ctx).await?; + let mut buf = version.get(self, key, ctx).await?; Ok(buf.get_u32_le()) } @@ -345,12 +349,12 @@ impl Timeline { &self, kind: SlruKind, segno: u32, - lsn: Lsn, + version: Version<'_>, ctx: &RequestContext, ) -> Result { // fetch directory listing let key = slru_dir_to_key(kind); - let buf = self.get(key, lsn, ctx).await?; + let buf = version.get(self, key, ctx).await?; match SlruSegmentDirectory::des(&buf).context("deserialization failure") { Ok(dir) => { @@ -501,11 +505,11 @@ impl Timeline { mut f: impl FnMut(TimestampTz) -> ControlFlow, ) -> Result { for segno in self - .list_slru_segments(SlruKind::Clog, probe_lsn, ctx) + .list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx) .await? { let nblocks = self - .get_slru_segment_size(SlruKind::Clog, segno, probe_lsn, ctx) + .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx) .await?; for blknum in (0..nblocks).rev() { let clog_page = self @@ -531,13 +535,13 @@ impl Timeline { pub async fn list_slru_segments( &self, kind: SlruKind, - lsn: Lsn, + version: Version<'_>, ctx: &RequestContext, ) -> Result, PageReconstructError> { // fetch directory entry let key = slru_dir_to_key(kind); - let buf = self.get(key, lsn, ctx).await?; + let buf = version.get(self, key, ctx).await?; match SlruSegmentDirectory::des(&buf).context("deserialization failure") { Ok(dir) => Ok(dir.segments), Err(e) => Err(PageReconstructError::from(e)), @@ -548,12 +552,12 @@ impl Timeline { &self, spcnode: Oid, dbnode: Oid, - lsn: Lsn, + version: Version<'_>, ctx: &RequestContext, ) -> Result { let key = relmap_file_key(spcnode, dbnode); - let buf = self.get(key, lsn, ctx).await?; + let buf = version.get(self, key, ctx).await?; Ok(buf) } @@ -652,7 +656,10 @@ impl Timeline { let mut total_size: u64 = 0; for (spcnode, dbnode) in dbdir.dbdirs.keys() { - for rel in self.list_rels(*spcnode, *dbnode, lsn, ctx).await? { + for rel in self + .list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx) + .await? + { if self.cancel.is_cancelled() { return Err(CalculateLogicalSizeError::Cancelled); } @@ -692,7 +699,7 @@ impl Timeline { result.add_key(rel_dir_to_key(spcnode, dbnode)); let mut rels: Vec = self - .list_rels(spcnode, dbnode, lsn, ctx) + .list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx) .await? .into_iter() .collect(); @@ -799,18 +806,39 @@ pub struct DatadirModification<'a> { /// in the state in 'tline' yet. pub tline: &'a Timeline, - /// Lsn assigned by begin_modification - pub lsn: Lsn, + /// Current LSN of the modification + lsn: Lsn, // The modifications are not applied directly to the underlying key-value store. // The put-functions add the modifications here, and they are flushed to the // underlying key-value store by the 'finish' function. - pending_updates: HashMap, - pending_deletions: Vec>, + pending_lsns: Vec, + pending_updates: HashMap>, + pending_deletions: Vec<(Range, Lsn)>, pending_nblocks: i64, } impl<'a> DatadirModification<'a> { + /// Get the current lsn + pub(crate) fn get_lsn(&self) -> Lsn { + self.lsn + } + + /// Set the current lsn + pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> { + ensure!( + lsn >= self.lsn, + "setting an older lsn {} than {} is not allowed", + lsn, + self.lsn + ); + if lsn > self.lsn { + self.pending_lsns.push(self.lsn); + self.lsn = lsn; + } + Ok(()) + } + /// Initialize a completely new repository. /// /// This inserts the directory metadata entries that are assumed to @@ -984,11 +1012,9 @@ impl<'a> DatadirModification<'a> { dbnode: Oid, ctx: &RequestContext, ) -> anyhow::Result<()> { - let req_lsn = self.tline.get_last_record_lsn(); - let total_blocks = self .tline - .get_db_size(spcnode, dbnode, req_lsn, true, ctx) + .get_db_size(spcnode, dbnode, Version::Modified(self), true, ctx) .await?; // Remove entry from dbdir @@ -1077,8 +1103,11 @@ impl<'a> DatadirModification<'a> { ctx: &RequestContext, ) -> anyhow::Result<()> { anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); - let last_lsn = self.tline.get_last_record_lsn(); - if self.tline.get_rel_exists(rel, last_lsn, true, ctx).await? { + if self + .tline + .get_rel_exists(rel, Version::Modified(self), true, ctx) + .await? + { let size_key = rel_size_to_key(rel); // Fetch the old size first let old_size = self.get(size_key, ctx).await?.get_u32_le(); @@ -1323,17 +1352,23 @@ impl<'a> DatadirModification<'a> { let writer = self.tline.writer().await; // Flush relation and SLRU data blocks, keep metadata. - let mut retained_pending_updates = HashMap::new(); - for (key, value) in self.pending_updates.drain() { - if is_rel_block_key(&key) || is_slru_block_key(key) { - // This bails out on first error without modifying pending_updates. - // That's Ok, cf this function's doc comment. - writer.put(key, self.lsn, &value, ctx).await?; - } else { - retained_pending_updates.insert(key, value); + let mut retained_pending_updates = HashMap::<_, Vec<_>>::new(); + for (key, values) in self.pending_updates.drain() { + for (lsn, value) in values { + if is_rel_block_key(&key) || is_slru_block_key(key) { + // This bails out on first error without modifying pending_updates. + // That's Ok, cf this function's doc comment. + writer.put(key, lsn, &value, ctx).await?; + } else { + retained_pending_updates + .entry(key) + .or_default() + .push((lsn, value)); + } } } - self.pending_updates.extend(retained_pending_updates); + + self.pending_updates = retained_pending_updates; if pending_nblocks != 0 { writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ)); @@ -1350,18 +1385,28 @@ impl<'a> DatadirModification<'a> { /// pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> { let writer = self.tline.writer().await; - let lsn = self.lsn; + let pending_nblocks = self.pending_nblocks; self.pending_nblocks = 0; - for (key, value) in self.pending_updates.drain() { - writer.put(key, lsn, &value, ctx).await?; - } - for key_range in self.pending_deletions.drain(..) { - writer.delete(key_range, lsn).await?; + if !self.pending_updates.is_empty() { + writer.put_batch(&self.pending_updates, ctx).await?; + self.pending_updates.clear(); } - writer.finish_write(lsn); + if !self.pending_deletions.is_empty() { + writer.delete_batch(&self.pending_deletions).await?; + self.pending_deletions.clear(); + } + + self.pending_lsns.push(self.lsn); + for pending_lsn in self.pending_lsns.drain(..) { + // Ideally, we should be able to call writer.finish_write() only once + // with the highest LSN. However, the last_record_lsn variable in the + // timeline keeps track of the latest LSN and the immediate previous LSN + // so we need to record every LSN to not leave a gap between them. + writer.finish_write(pending_lsn); + } if pending_nblocks != 0 { writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ)); @@ -1370,44 +1415,86 @@ impl<'a> DatadirModification<'a> { Ok(()) } - pub(crate) fn is_empty(&self) -> bool { - self.pending_updates.is_empty() && self.pending_deletions.is_empty() + pub(crate) fn len(&self) -> usize { + self.pending_updates.len() + self.pending_deletions.len() } // Internal helper functions to batch the modifications async fn get(&self, key: Key, ctx: &RequestContext) -> Result { - // Have we already updated the same key? Read the pending updated + // Have we already updated the same key? Read the latest pending updated // version in that case. // // Note: we don't check pending_deletions. It is an error to request a // value that has been removed, deletion only avoids leaking storage. - if let Some(value) = self.pending_updates.get(&key) { - if let Value::Image(img) = value { - Ok(img.clone()) - } else { - // Currently, we never need to read back a WAL record that we - // inserted in the same "transaction". All the metadata updates - // work directly with Images, and we never need to read actual - // data pages. We could handle this if we had to, by calling - // the walredo manager, but let's keep it simple for now. - Err(PageReconstructError::from(anyhow::anyhow!( - "unexpected pending WAL record" - ))) + if let Some(values) = self.pending_updates.get(&key) { + if let Some((_, value)) = values.last() { + return if let Value::Image(img) = value { + Ok(img.clone()) + } else { + // Currently, we never need to read back a WAL record that we + // inserted in the same "transaction". All the metadata updates + // work directly with Images, and we never need to read actual + // data pages. We could handle this if we had to, by calling + // the walredo manager, but let's keep it simple for now. + Err(PageReconstructError::from(anyhow::anyhow!( + "unexpected pending WAL record" + ))) + }; } - } else { - let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn); - self.tline.get(key, lsn, ctx).await } + let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn); + self.tline.get(key, lsn, ctx).await } fn put(&mut self, key: Key, val: Value) { - self.pending_updates.insert(key, val); + let values = self.pending_updates.entry(key).or_default(); + // Replace the previous value if it exists at the same lsn + if let Some((last_lsn, last_value)) = values.last_mut() { + if *last_lsn == self.lsn { + *last_value = val; + return; + } + } + values.push((self.lsn, val)); } fn delete(&mut self, key_range: Range) { trace!("DELETE {}-{}", key_range.start, key_range.end); - self.pending_deletions.push(key_range); + self.pending_deletions.push((key_range, self.lsn)); + } +} + +/// This struct facilitates accessing either a committed key from the timeline at a +/// specific LSN, or the latest uncommitted key from a pending modification. +/// During WAL ingestion, the records from multiple LSNs may be batched in the same +/// modification before being flushed to the timeline. Hence, the routines in WalIngest +/// need to look up the keys in the modification first before looking them up in the +/// timeline to not miss the latest updates. +#[derive(Clone, Copy)] +pub enum Version<'a> { + Lsn(Lsn), + Modified(&'a DatadirModification<'a>), +} + +impl<'a> Version<'a> { + async fn get( + &self, + timeline: &Timeline, + key: Key, + ctx: &RequestContext, + ) -> Result { + match self { + Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await, + Version::Modified(modification) => modification.get(key, ctx).await, + } + } + + fn get_lsn(&self) -> Lsn { + match self { + Version::Lsn(lsn) => *lsn, + Version::Modified(modification) => modification.lsn, + } } } diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index 25d97f51ce..2d4cd350d7 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -46,6 +46,8 @@ pub mod defaults { pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds"; pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10 * 1024 * 1024; pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour"; + + pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100; } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 003cf0e92b..7c9103eea8 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -23,7 +23,7 @@ use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap}; // while being able to use std::fmt::Write's methods use std::fmt::Write as _; use std::ops::Range; -use tokio::sync::RwLock; +use tokio::sync::{RwLock, RwLockWriteGuard}; use super::{DeltaLayerWriter, ResidentLayer}; @@ -246,16 +246,43 @@ impl InMemoryLayer { /// Common subroutine of the public put_wal_record() and put_page_image() functions. /// Adds the page version to the in-memory tree - pub async fn put_value( + pub(crate) async fn put_value( &self, key: Key, lsn: Lsn, val: &Value, ctx: &RequestContext, ) -> Result<()> { - trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn); - let inner: &mut _ = &mut *self.inner.write().await; + let mut inner = self.inner.write().await; self.assert_writable(); + self.put_value_locked(&mut inner, key, lsn, val, ctx).await + } + + pub(crate) async fn put_values( + &self, + values: &HashMap>, + ctx: &RequestContext, + ) -> Result<()> { + let mut inner = self.inner.write().await; + self.assert_writable(); + for (key, vals) in values { + for (lsn, val) in vals { + self.put_value_locked(&mut inner, *key, *lsn, val, ctx) + .await?; + } + } + Ok(()) + } + + async fn put_value_locked( + &self, + locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>, + key: Key, + lsn: Lsn, + val: &Value, + ctx: &RequestContext, + ) -> Result<()> { + trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn); let off = { // Avoid doing allocations for "small" values. @@ -264,7 +291,7 @@ impl InMemoryLayer { let mut buf = smallvec::SmallVec::<[u8; 256]>::new(); buf.clear(); val.ser_into(&mut buf)?; - inner + locked_inner .file .write_blob( &buf, @@ -275,7 +302,7 @@ impl InMemoryLayer { .await? }; - let vec_map = inner.index.entry(key).or_default(); + let vec_map = locked_inner.index.entry(key).or_default(); let old = vec_map.append_or_update_last(lsn, off).unwrap().0; if old.is_some() { // We already had an entry for this LSN. That's odd.. @@ -285,13 +312,11 @@ impl InMemoryLayer { Ok(()) } - pub async fn put_tombstone(&self, _key_range: Range, _lsn: Lsn) -> Result<()> { + pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range, Lsn)]) -> Result<()> { // TODO: Currently, we just leak the storage for any deleted keys - Ok(()) } - /// Make the layer non-writeable. Only call once. /// Records the end_lsn for non-dropped layers. /// `end_lsn` is exclusive pub async fn freeze(&self, end_lsn: Lsn) { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 1e84fa1848..15a5ca1727 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1459,6 +1459,7 @@ impl Timeline { max_lsn_wal_lag, auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(), availability_zone: self.conf.availability_zone.clone(), + ingest_batch_size: self.conf.ingest_batch_size, }, broker_client, ctx, @@ -2471,9 +2472,27 @@ impl Timeline { Ok(()) } - async fn put_tombstone(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { - let layer = self.get_layer_for_write(lsn).await?; - layer.put_tombstone(key_range, lsn).await?; + async fn put_values( + &self, + values: &HashMap>, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + // Pick the first LSN in the batch to get the layer to write to. + for lsns in values.values() { + if let Some((lsn, _)) = lsns.first() { + let layer = self.get_layer_for_write(*lsn).await?; + layer.put_values(values, ctx).await?; + break; + } + } + Ok(()) + } + + async fn put_tombstones(&self, tombstones: &[(Range, Lsn)]) -> anyhow::Result<()> { + if let Some((_, lsn)) = tombstones.first() { + let layer = self.get_layer_for_write(*lsn).await?; + layer.put_tombstones(tombstones).await?; + } Ok(()) } @@ -4529,8 +4548,16 @@ impl<'a> TimelineWriter<'a> { self.tl.put_value(key, lsn, value, ctx).await } - pub async fn delete(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { - self.tl.put_tombstone(key_range, lsn).await + pub(crate) async fn put_batch( + &self, + batch: &HashMap>, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + self.tl.put_values(batch, ctx).await + } + + pub(crate) async fn delete_batch(&self, batch: &[(Range, Lsn)]) -> anyhow::Result<()> { + self.tl.put_tombstones(batch).await } /// Track the end of the latest digested WAL record. @@ -4541,11 +4568,11 @@ impl<'a> TimelineWriter<'a> { /// 'lsn' must be aligned. This wakes up any wait_lsn() callers waiting for /// the 'lsn' or anything older. The previous last record LSN is stored alongside /// the latest and can be read. - pub fn finish_write(&self, new_lsn: Lsn) { + pub(crate) fn finish_write(&self, new_lsn: Lsn) { self.tl.finish_write(new_lsn); } - pub fn update_current_logical_size(&self, delta: i64) { + pub(crate) fn update_current_logical_size(&self, delta: i64) { self.tl.update_current_logical_size(delta) } } diff --git a/pageserver/src/tenant/timeline/walreceiver.rs b/pageserver/src/tenant/timeline/walreceiver.rs index e32265afb5..2fab6722b8 100644 --- a/pageserver/src/tenant/timeline/walreceiver.rs +++ b/pageserver/src/tenant/timeline/walreceiver.rs @@ -58,6 +58,7 @@ pub struct WalReceiverConf { pub max_lsn_wal_lag: NonZeroU64, pub auth_token: Option>, pub availability_zone: Option, + pub ingest_batch_size: u64, } pub struct WalReceiver { diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 5a5b3d7586..7fa5bb7689 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -411,6 +411,7 @@ impl ConnectionManagerState { let node_id = new_sk.safekeeper_id; let connect_timeout = self.conf.wal_connect_timeout; + let ingest_batch_size = self.conf.ingest_batch_size; let timeline = Arc::clone(&self.timeline); let ctx = ctx.detached_child( TaskKind::WalReceiverConnectionHandler, @@ -430,6 +431,7 @@ impl ConnectionManagerState { connect_timeout, ctx, node_id, + ingest_batch_size, ) .await; @@ -1345,6 +1347,7 @@ mod tests { max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(), auth_token: None, availability_zone: None, + ingest_batch_size: 1, }, wal_connection: None, wal_stream_candidates: HashMap::new(), diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 61ab236322..e398d683e5 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -26,7 +26,7 @@ use tracing::{debug, error, info, trace, warn, Instrument}; use super::TaskStateUpdate; use crate::{ context::RequestContext, - metrics::{LIVE_CONNECTIONS_COUNT, WALRECEIVER_STARTED_CONNECTIONS}, + metrics::{LIVE_CONNECTIONS_COUNT, WALRECEIVER_STARTED_CONNECTIONS, WAL_INGEST}, task_mgr, task_mgr::TaskKind, task_mgr::WALRECEIVER_RUNTIME, @@ -106,6 +106,7 @@ impl From for WalReceiverError { /// Open a connection to the given safekeeper and receive WAL, sending back progress /// messages as we go. +#[allow(clippy::too_many_arguments)] pub(super) async fn handle_walreceiver_connection( timeline: Arc, wal_source_connconf: PgConnectionConfig, @@ -114,6 +115,7 @@ pub(super) async fn handle_walreceiver_connection( connect_timeout: Duration, ctx: RequestContext, node: NodeId, + ingest_batch_size: u64, ) -> Result<(), WalReceiverError> { debug_assert_current_span_has_tenant_and_timeline_id(); @@ -305,7 +307,9 @@ pub(super) async fn handle_walreceiver_connection( { let mut decoded = DecodedWALRecord::default(); - let mut modification = timeline.begin_modification(endlsn); + let mut modification = timeline.begin_modification(startlsn); + let mut uncommitted_records = 0; + let mut filtered_records = 0; while let Some((lsn, recdata)) = waldecoder.poll_decode()? { // It is important to deal with the aligned records as lsn in getPage@LSN is // aligned and can be several bytes bigger. Without this alignment we are @@ -314,14 +318,40 @@ pub(super) async fn handle_walreceiver_connection( return Err(WalReceiverError::Other(anyhow!("LSN not aligned"))); } - walingest + // Ingest the records without immediately committing them. + let ingested = walingest .ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx) .await .with_context(|| format!("could not ingest record at {lsn}"))?; + if !ingested { + tracing::debug!("ingest: filtered out record @ LSN {lsn}"); + WAL_INGEST.records_filtered.inc(); + filtered_records += 1; + } fail_point!("walreceiver-after-ingest"); last_rec_lsn = lsn; + + // Commit every ingest_batch_size records. Even if we filtered out + // all records, we still need to call commit to advance the LSN. + uncommitted_records += 1; + if uncommitted_records >= ingest_batch_size { + WAL_INGEST + .records_committed + .inc_by(uncommitted_records - filtered_records); + modification.commit(&ctx).await?; + uncommitted_records = 0; + filtered_records = 0; + } + } + + // Commit the remaining records. + if uncommitted_records > 0 { + WAL_INGEST + .records_committed + .inc_by(uncommitted_records - filtered_records); + modification.commit(&ctx).await?; } } diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index a6a8972970..8df0c81c7a 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -48,20 +48,18 @@ use postgres_ffi::TransactionId; use postgres_ffi::BLCKSZ; use utils::lsn::Lsn; -pub struct WalIngest<'a> { +pub struct WalIngest { shard: ShardIdentity, - timeline: &'a Timeline, - checkpoint: CheckPoint, checkpoint_modified: bool, } -impl<'a> WalIngest<'a> { +impl WalIngest { pub async fn new( - timeline: &'a Timeline, + timeline: &Timeline, startpoint: Lsn, - ctx: &'_ RequestContext, - ) -> anyhow::Result> { + ctx: &RequestContext, + ) -> anyhow::Result { // Fetch the latest checkpoint into memory, so that we can compare with it // quickly in `ingest_record` and update it when it changes. let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?; @@ -70,7 +68,6 @@ impl<'a> WalIngest<'a> { Ok(WalIngest { shard: *timeline.get_shard_identity(), - timeline, checkpoint, checkpoint_modified: false, }) @@ -84,6 +81,8 @@ impl<'a> WalIngest<'a> { /// Helper function to parse a WAL record and call the Timeline's PUT functions for all the /// relations/pages that the record affects. /// + /// This function returns `true` if the record was ingested, and `false` if it was filtered out + /// pub async fn ingest_record( &mut self, recdata: Bytes, @@ -91,11 +90,13 @@ impl<'a> WalIngest<'a> { modification: &mut DatadirModification<'_>, decoded: &mut DecodedWALRecord, ctx: &RequestContext, - ) -> anyhow::Result<()> { + ) -> anyhow::Result { WAL_INGEST.records_received.inc(); + let pg_version = modification.tline.pg_version; + let prev_len = modification.len(); - modification.lsn = lsn; - decode_wal_record(recdata, decoded, self.timeline.pg_version)?; + modification.set_lsn(lsn)?; + decode_wal_record(recdata, decoded, pg_version)?; let mut buf = decoded.record.clone(); buf.advance(decoded.main_data_offset); @@ -132,9 +133,9 @@ impl<'a> WalIngest<'a> { } pg_constants::RM_DBASE_ID => { let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - debug!(%info, pg_version=%self.timeline.pg_version, "handle RM_DBASE_ID"); + debug!(%info, %pg_version, "handle RM_DBASE_ID"); - if self.timeline.pg_version == 14 { + if pg_version == 14 { if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE { let createdb = XlCreateDatabase::decode(&mut buf); debug!("XLOG_DBASE_CREATE v14"); @@ -150,7 +151,7 @@ impl<'a> WalIngest<'a> { .await?; } } - } else if self.timeline.pg_version == 15 { + } else if pg_version == 15 { if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG { debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY { @@ -170,7 +171,7 @@ impl<'a> WalIngest<'a> { .await?; } } - } else if self.timeline.pg_version == 16 { + } else if pg_version == 16 { if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG { debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY { @@ -399,19 +400,11 @@ impl<'a> WalIngest<'a> { self.checkpoint_modified = false; } - if modification.is_empty() { - tracing::debug!("ingest: filtered out record @ LSN {lsn}"); - WAL_INGEST.records_filtered.inc(); - modification.tline.finish_write(lsn); - } else { - WAL_INGEST.records_committed.inc(); - modification.commit(ctx).await?; - } + // Note that at this point this record is only cached in the modification + // until commit() is called to flush the data into the repository and update + // the latest LSN. - // Now that this record has been fully handled, including updating the - // checkpoint data, let the repository know that it is up-to-date to this LSN. - - Ok(()) + Ok(modification.len() > prev_len) } /// Do not store this block, but observe it for the purposes of updating our relation size state. @@ -458,7 +451,7 @@ impl<'a> WalIngest<'a> { && (decoded.xl_info == pg_constants::XLOG_FPI || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT) // compression of WAL is not yet supported: fall back to storing the original WAL record - && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, self.timeline.pg_version)? + && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)? // do not materialize null pages because them most likely be soon replaced with real data && blk.bimg_len != 0 { @@ -511,7 +504,7 @@ impl<'a> WalIngest<'a> { let mut old_heap_blkno: Option = None; let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS; - match self.timeline.pg_version { + match modification.tline.pg_version { 14 => { if decoded.xl_rmid == pg_constants::RM_HEAP_ID { let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; @@ -735,7 +728,7 @@ impl<'a> WalIngest<'a> { // replaying it would fail to find the previous image of the page, because // it doesn't exist. So check if the VM page(s) exist, and skip the WAL // record if it doesn't. - let vm_size = self.get_relsize(vm_rel, modification.lsn, ctx).await?; + let vm_size = get_relsize(modification, vm_rel, ctx).await?; if let Some(blknum) = new_vm_blk { if blknum >= vm_size { new_vm_blk = None; @@ -816,10 +809,11 @@ impl<'a> WalIngest<'a> { let mut new_heap_blkno: Option = None; let mut old_heap_blkno: Option = None; let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS; + let pg_version = modification.tline.pg_version; assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID); - match self.timeline.pg_version { + match pg_version { 16 => { let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; @@ -882,7 +876,7 @@ impl<'a> WalIngest<'a> { } _ => bail!( "Neon RMGR has no known compatibility with PostgreSQL version {}", - self.timeline.pg_version + pg_version ), } @@ -905,7 +899,7 @@ impl<'a> WalIngest<'a> { // replaying it would fail to find the previous image of the page, because // it doesn't exist. So check if the VM page(s) exist, and skip the WAL // record if it doesn't. - let vm_size = self.get_relsize(vm_rel, modification.lsn, ctx).await?; + let vm_size = get_relsize(modification, vm_rel, ctx).await?; if let Some(blknum) = new_vm_blk { if blknum >= vm_size { new_vm_blk = None; @@ -983,16 +977,14 @@ impl<'a> WalIngest<'a> { let src_db_id = rec.src_db_id; let src_tablespace_id = rec.src_tablespace_id; - // Creating a database is implemented by copying the template (aka. source) database. - // To copy all the relations, we need to ask for the state as of the same LSN, but we - // cannot pass 'lsn' to the Timeline.get_* functions, or they will block waiting for - // the last valid LSN to advance up to it. So we use the previous record's LSN in the - // get calls instead. - let req_lsn = modification.tline.get_last_record_lsn(); - let rels = modification .tline - .list_rels(src_tablespace_id, src_db_id, req_lsn, ctx) + .list_rels( + src_tablespace_id, + src_db_id, + Version::Modified(modification), + ctx, + ) .await?; debug!("ingest_xlog_dbase_create: {} rels", rels.len()); @@ -1000,7 +992,12 @@ impl<'a> WalIngest<'a> { // Copy relfilemap let filemap = modification .tline - .get_relmap_file(src_tablespace_id, src_db_id, req_lsn, ctx) + .get_relmap_file( + src_tablespace_id, + src_db_id, + Version::Modified(modification), + ctx, + ) .await?; modification .put_relmap_file(tablespace_id, db_id, filemap, ctx) @@ -1014,7 +1011,7 @@ impl<'a> WalIngest<'a> { let nblocks = modification .tline - .get_rel_size(src_rel, req_lsn, true, ctx) + .get_rel_size(src_rel, Version::Modified(modification), true, ctx) .await?; let dst_rel = RelTag { spcnode: tablespace_id, @@ -1032,7 +1029,13 @@ impl<'a> WalIngest<'a> { let content = modification .tline - .get_rel_page_at_lsn(src_rel, blknum, req_lsn, true, ctx) + .get_rel_page_at_lsn( + src_rel, + blknum, + Version::Modified(modification), + true, + ctx, + ) .await?; modification.put_rel_page_image(dst_rel, blknum, content)?; num_blocks_copied += 1; @@ -1103,7 +1106,7 @@ impl<'a> WalIngest<'a> { modification.put_rel_page_image(rel, fsm_physical_page_no, ZERO_PAGE.clone())?; fsm_physical_page_no += 1; } - let nblocks = self.get_relsize(rel, modification.lsn, ctx).await?; + let nblocks = get_relsize(modification, rel, ctx).await?; if nblocks > fsm_physical_page_no { // check if something to do: FSM is larger than truncate position self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx) @@ -1125,7 +1128,7 @@ impl<'a> WalIngest<'a> { modification.put_rel_page_image(rel, vm_page_no, ZERO_PAGE.clone())?; vm_page_no += 1; } - let nblocks = self.get_relsize(rel, modification.lsn, ctx).await?; + let nblocks = get_relsize(modification, rel, ctx).await?; if nblocks > vm_page_no { // check if something to do: VM is larger than truncate position self.put_rel_truncation(modification, rel, vm_page_no, ctx) @@ -1198,10 +1201,9 @@ impl<'a> WalIngest<'a> { dbnode: xnode.dbnode, relnode: xnode.relnode, }; - let last_lsn = self.timeline.get_last_record_lsn(); if modification .tline - .get_rel_exists(rel, last_lsn, true, ctx) + .get_rel_exists(rel, Version::Modified(modification), true, ctx) .await? { self.put_rel_drop(modification, rel, ctx).await?; @@ -1255,10 +1257,9 @@ impl<'a> WalIngest<'a> { // will block waiting for the last valid LSN to advance up to // it. So we use the previous record's LSN in the get calls // instead. - let req_lsn = modification.tline.get_last_record_lsn(); for segno in modification .tline - .list_slru_segments(SlruKind::Clog, req_lsn, ctx) + .list_slru_segments(SlruKind::Clog, Version::Modified(modification), ctx) .await? { let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT; @@ -1470,20 +1471,6 @@ impl<'a> WalIngest<'a> { Ok(()) } - async fn get_relsize( - &mut self, - rel: RelTag, - lsn: Lsn, - ctx: &RequestContext, - ) -> anyhow::Result { - let nblocks = if !self.timeline.get_rel_exists(rel, lsn, true, ctx).await? { - 0 - } else { - self.timeline.get_rel_size(rel, lsn, true, ctx).await? - }; - Ok(nblocks) - } - async fn handle_rel_extend( &mut self, modification: &mut DatadirModification<'_>, @@ -1495,7 +1482,6 @@ impl<'a> WalIngest<'a> { // Check if the relation exists. We implicitly create relations on first // record. // TODO: would be nice if to be more explicit about it - let last_lsn = modification.lsn; // Get current size and put rel creation if rel doesn't exist // @@ -1503,11 +1489,14 @@ impl<'a> WalIngest<'a> { // check the cache too. This is because eagerly checking the cache results in // less work overall and 10% better performance. It's more work on cache miss // but cache miss is rare. - let old_nblocks = if let Some(nblocks) = self.timeline.get_cached_rel_size(&rel, last_lsn) { + let old_nblocks = if let Some(nblocks) = modification + .tline + .get_cached_rel_size(&rel, modification.get_lsn()) + { nblocks - } else if !self - .timeline - .get_rel_exists(rel, last_lsn, true, ctx) + } else if !modification + .tline + .get_rel_exists(rel, Version::Modified(modification), true, ctx) .await? { // create it with 0 size initially, the logic below will extend it @@ -1517,7 +1506,10 @@ impl<'a> WalIngest<'a> { .context("Relation Error")?; 0 } else { - self.timeline.get_rel_size(rel, last_lsn, true, ctx).await? + modification + .tline + .get_rel_size(rel, Version::Modified(modification), true, ctx) + .await? }; if new_nblocks > old_nblocks { @@ -1570,10 +1562,9 @@ impl<'a> WalIngest<'a> { // Check if the relation exists. We implicitly create relations on first // record. // TODO: would be nice if to be more explicit about it - let last_lsn = self.timeline.get_last_record_lsn(); - let old_nblocks = if !self - .timeline - .get_slru_segment_exists(kind, segno, last_lsn, ctx) + let old_nblocks = if !modification + .tline + .get_slru_segment_exists(kind, segno, Version::Modified(modification), ctx) .await? { // create it with 0 size initially, the logic below will extend it @@ -1582,8 +1573,9 @@ impl<'a> WalIngest<'a> { .await?; 0 } else { - self.timeline - .get_slru_segment_size(kind, segno, last_lsn, ctx) + modification + .tline + .get_slru_segment_size(kind, segno, Version::Modified(modification), ctx) .await? }; @@ -1606,6 +1598,26 @@ impl<'a> WalIngest<'a> { } } +async fn get_relsize( + modification: &DatadirModification<'_>, + rel: RelTag, + ctx: &RequestContext, +) -> anyhow::Result { + let nblocks = if !modification + .tline + .get_rel_exists(rel, Version::Modified(modification), true, ctx) + .await? + { + 0 + } else { + modification + .tline + .get_rel_size(rel, Version::Modified(modification), true, ctx) + .await? + }; + Ok(nblocks) +} + #[allow(clippy::bool_assert_comparison)] #[cfg(test)] mod tests { @@ -1632,10 +1644,7 @@ mod tests { static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]); - async fn init_walingest_test<'a>( - tline: &'a Timeline, - ctx: &RequestContext, - ) -> Result> { + async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result { let mut m = tline.begin_modification(Lsn(0x10)); m.put_checkpoint(ZERO_CHECKPOINT.clone())?; m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file @@ -1680,29 +1689,29 @@ mod tests { // The relation was created at LSN 2, not visible at LSN 1 yet. assert_eq!( tline - .get_rel_exists(TESTREL_A, Lsn(0x10), false, &ctx) + .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx) .await?, false ); assert!(tline - .get_rel_size(TESTREL_A, Lsn(0x10), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx) .await .is_err()); assert_eq!( tline - .get_rel_exists(TESTREL_A, Lsn(0x20), false, &ctx) + .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx) .await?, true ); assert_eq!( tline - .get_rel_size(TESTREL_A, Lsn(0x20), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx) .await?, 1 ); assert_eq!( tline - .get_rel_size(TESTREL_A, Lsn(0x50), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx) .await?, 3 ); @@ -1710,46 +1719,46 @@ mod tests { // Check page contents at each LSN assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x20), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), false, &ctx) .await?, TEST_IMG("foo blk 0 at 2") ); assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x30), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), false, &ctx) .await?, TEST_IMG("foo blk 0 at 3") ); assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x40), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), false, &ctx) .await?, TEST_IMG("foo blk 0 at 3") ); assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), false, &ctx) .await?, TEST_IMG("foo blk 1 at 4") ); assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x50), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), false, &ctx) .await?, TEST_IMG("foo blk 0 at 3") ); assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), false, &ctx) .await?, TEST_IMG("foo blk 1 at 4") ); assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, 2, Lsn(0x50), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), false, &ctx) .await?, TEST_IMG("foo blk 2 at 5") ); @@ -1765,19 +1774,19 @@ mod tests { // Check reported size and contents after truncation assert_eq!( tline - .get_rel_size(TESTREL_A, Lsn(0x60), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), false, &ctx) .await?, 2 ); assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x60), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), false, &ctx) .await?, TEST_IMG("foo blk 0 at 3") ); assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), false, &ctx) .await?, TEST_IMG("foo blk 1 at 4") ); @@ -1785,13 +1794,13 @@ mod tests { // should still see the truncated block with older LSN assert_eq!( tline - .get_rel_size(TESTREL_A, Lsn(0x50), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx) .await?, 3 ); assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, 2, Lsn(0x50), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), false, &ctx) .await?, TEST_IMG("foo blk 2 at 5") ); @@ -1804,7 +1813,7 @@ mod tests { m.commit(&ctx).await?; assert_eq!( tline - .get_rel_size(TESTREL_A, Lsn(0x68), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), false, &ctx) .await?, 0 ); @@ -1817,19 +1826,19 @@ mod tests { m.commit(&ctx).await?; assert_eq!( tline - .get_rel_size(TESTREL_A, Lsn(0x70), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), false, &ctx) .await?, 2 ); assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x70), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), false, &ctx) .await?, ZERO_PAGE ); assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x70), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), false, &ctx) .await?, TEST_IMG("foo blk 1") ); @@ -1842,21 +1851,21 @@ mod tests { m.commit(&ctx).await?; assert_eq!( tline - .get_rel_size(TESTREL_A, Lsn(0x80), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx) .await?, 1501 ); for blk in 2..1500 { assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, blk, Lsn(0x80), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), false, &ctx) .await?, ZERO_PAGE ); } assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, 1500, Lsn(0x80), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), false, &ctx) .await?, TEST_IMG("foo blk 1500") ); @@ -1883,13 +1892,13 @@ mod tests { // Check that rel exists and size is correct assert_eq!( tline - .get_rel_exists(TESTREL_A, Lsn(0x20), false, &ctx) + .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx) .await?, true ); assert_eq!( tline - .get_rel_size(TESTREL_A, Lsn(0x20), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx) .await?, 1 ); @@ -1902,7 +1911,7 @@ mod tests { // Check that rel is not visible anymore assert_eq!( tline - .get_rel_exists(TESTREL_A, Lsn(0x30), false, &ctx) + .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), false, &ctx) .await?, false ); @@ -1920,13 +1929,13 @@ mod tests { // Check that rel exists and size is correct assert_eq!( tline - .get_rel_exists(TESTREL_A, Lsn(0x40), false, &ctx) + .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), false, &ctx) .await?, true ); assert_eq!( tline - .get_rel_size(TESTREL_A, Lsn(0x40), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), false, &ctx) .await?, 1 ); @@ -1959,24 +1968,24 @@ mod tests { // The relation was created at LSN 20, not visible at LSN 1 yet. assert_eq!( tline - .get_rel_exists(TESTREL_A, Lsn(0x10), false, &ctx) + .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx) .await?, false ); assert!(tline - .get_rel_size(TESTREL_A, Lsn(0x10), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx) .await .is_err()); assert_eq!( tline - .get_rel_exists(TESTREL_A, Lsn(0x20), false, &ctx) + .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx) .await?, true ); assert_eq!( tline - .get_rel_size(TESTREL_A, Lsn(0x20), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx) .await?, relsize ); @@ -1987,7 +1996,7 @@ mod tests { let data = format!("foo blk {} at {}", blkno, lsn); assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, blkno, lsn, false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), false, &ctx) .await?, TEST_IMG(&data) ); @@ -2004,7 +2013,7 @@ mod tests { // Check reported size and contents after truncation assert_eq!( tline - .get_rel_size(TESTREL_A, Lsn(0x60), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), false, &ctx) .await?, 1 ); @@ -2014,7 +2023,7 @@ mod tests { let data = format!("foo blk {} at {}", blkno, lsn); assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x60), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), false, &ctx) .await?, TEST_IMG(&data) ); @@ -2023,7 +2032,7 @@ mod tests { // should still see all blocks with older LSN assert_eq!( tline - .get_rel_size(TESTREL_A, Lsn(0x50), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx) .await?, relsize ); @@ -2032,7 +2041,7 @@ mod tests { let data = format!("foo blk {} at {}", blkno, lsn); assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x50), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), false, &ctx) .await?, TEST_IMG(&data) ); @@ -2052,13 +2061,13 @@ mod tests { assert_eq!( tline - .get_rel_exists(TESTREL_A, Lsn(0x80), false, &ctx) + .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx) .await?, true ); assert_eq!( tline - .get_rel_size(TESTREL_A, Lsn(0x80), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx) .await?, relsize ); @@ -2068,7 +2077,7 @@ mod tests { let data = format!("foo blk {} at {}", blkno, lsn); assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x80), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), false, &ctx) .await?, TEST_IMG(&data) ); @@ -2101,7 +2110,9 @@ mod tests { assert_current_logical_size(&tline, Lsn(lsn)); assert_eq!( - tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?, + tline + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx) + .await?, RELSEG_SIZE + 1 ); @@ -2113,7 +2124,9 @@ mod tests { .await?; m.commit(&ctx).await?; assert_eq!( - tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?, + tline + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx) + .await?, RELSEG_SIZE ); assert_current_logical_size(&tline, Lsn(lsn)); @@ -2126,7 +2139,9 @@ mod tests { .await?; m.commit(&ctx).await?; assert_eq!( - tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?, + tline + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx) + .await?, RELSEG_SIZE - 1 ); assert_current_logical_size(&tline, Lsn(lsn)); @@ -2142,7 +2157,9 @@ mod tests { .await?; m.commit(&ctx).await?; assert_eq!( - tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?, + tline + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx) + .await?, size as BlockNumber ); @@ -2179,7 +2196,7 @@ mod tests { let wal_segment_path = format!("{path}/000000010000000000000001.zst"); let source_initdb_path = format!("{path}/{INITDB_PATH}"); let startpoint = Lsn::from_hex("14AEC08").unwrap(); - let endpoint = Lsn::from_hex("1FFFF98").unwrap(); + let _endpoint = Lsn::from_hex("1FFFF98").unwrap(); let harness = TenantHarness::create("test_ingest_real_wal").unwrap(); let (tenant, ctx) = harness.load().await; @@ -2221,7 +2238,7 @@ mod tests { let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx) .await .unwrap(); - let mut modification = tline.begin_modification(endpoint); + let mut modification = tline.begin_modification(startpoint); let mut decoded = DecodedWALRecord::default(); println!("decoding {} bytes", bytes.len() - xlogoff); @@ -2235,6 +2252,7 @@ mod tests { .await .unwrap(); } + modification.commit(&ctx).await.unwrap(); } let duration = started_at.elapsed();