diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 1978becf83..f1d92ac36b 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -27,7 +27,7 @@ use tracing::*; /// use tokio_tar::{Builder, EntryType, Header}; -use crate::tenant::{with_ondemand_download, Timeline}; +use crate::tenant::Timeline; use pageserver_api::reltag::{RelTag, SlruKind}; use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID}; @@ -171,30 +171,23 @@ where SlruKind::MultiXactOffsets, SlruKind::MultiXactMembers, ] { - for segno in - with_ondemand_download(|| self.timeline.list_slru_segments(kind, self.lsn)).await? - { + for segno in self.timeline.list_slru_segments(kind, self.lsn).await? { self.add_slru_segment(kind, segno).await?; } } // Create tablespace directories - for ((spcnode, dbnode), has_relmap_file) in - with_ondemand_download(|| self.timeline.list_dbdirs(self.lsn)).await? - { + for ((spcnode, dbnode), has_relmap_file) in self.timeline.list_dbdirs(self.lsn).await? { self.add_dbdir(spcnode, dbnode, has_relmap_file).await?; // Gather and send relational files in each database if full backup is requested. if self.full_backup { - for rel in - with_ondemand_download(|| self.timeline.list_rels(spcnode, dbnode, self.lsn)) - .await? - { + for rel in self.timeline.list_rels(spcnode, dbnode, self.lsn).await? { self.add_rel(rel).await?; } } } - for xid in with_ondemand_download(|| self.timeline.list_twophase_files(self.lsn)).await? { + for xid in self.timeline.list_twophase_files(self.lsn).await? { self.add_twophase_file(xid).await?; } @@ -210,8 +203,7 @@ where } async fn add_rel(&mut self, tag: RelTag) -> anyhow::Result<()> { - let nblocks = - with_ondemand_download(|| self.timeline.get_rel_size(tag, self.lsn, false)).await?; + let nblocks = self.timeline.get_rel_size(tag, self.lsn, false).await?; // If the relation is empty, create an empty file if nblocks == 0 { @@ -229,11 +221,10 @@ where let mut segment_data: Vec = vec![]; for blknum in startblk..endblk { - let img = with_ondemand_download(|| { - self.timeline - .get_rel_page_at_lsn(tag, blknum, self.lsn, false) - }) - .await?; + let img = self + .timeline + .get_rel_page_at_lsn(tag, blknum, self.lsn, false) + .await?; segment_data.extend_from_slice(&img[..]); } @@ -252,17 +243,17 @@ where // Generate SLRU segment files from repository. // async fn add_slru_segment(&mut self, slru: SlruKind, segno: u32) -> anyhow::Result<()> { - let nblocks = - with_ondemand_download(|| self.timeline.get_slru_segment_size(slru, segno, self.lsn)) - .await?; + let nblocks = self + .timeline + .get_slru_segment_size(slru, segno, self.lsn) + .await?; let mut slru_buf: Vec = Vec::with_capacity(nblocks as usize * BLCKSZ as usize); for blknum in 0..nblocks { - let img = with_ondemand_download(|| { - self.timeline - .get_slru_page_at_lsn(slru, segno, blknum, self.lsn) - }) - .await?; + let img = self + .timeline + .get_slru_page_at_lsn(slru, segno, blknum, self.lsn) + .await?; if slru == SlruKind::Clog { ensure!(img.len() == BLCKSZ as usize || img.len() == BLCKSZ as usize + 8); @@ -294,9 +285,10 @@ where has_relmap_file: bool, ) -> anyhow::Result<()> { let relmap_img = if has_relmap_file { - let img = - with_ondemand_download(|| self.timeline.get_relmap_file(spcnode, dbnode, self.lsn)) - .await?; + let img = self + .timeline + .get_relmap_file(spcnode, dbnode, self.lsn) + .await?; ensure!(img.len() == 512); Some(img) } else { @@ -329,7 +321,9 @@ where // XLOG_TBLSPC_DROP records. But we probably should just // throw an error on CREATE TABLESPACE in the first place. if !has_relmap_file - && with_ondemand_download(|| self.timeline.list_rels(spcnode, dbnode, self.lsn)) + && self + .timeline + .list_rels(spcnode, dbnode, self.lsn) .await? .is_empty() { @@ -362,7 +356,7 @@ where // Extract twophase state files // async fn add_twophase_file(&mut self, xid: TransactionId) -> anyhow::Result<()> { - let img = with_ondemand_download(|| self.timeline.get_twophase_file(xid, self.lsn)).await?; + let img = self.timeline.get_twophase_file(xid, self.lsn).await?; let mut buf = BytesMut::new(); buf.extend_from_slice(&img[..]); @@ -398,10 +392,14 @@ where ) .await?; - let checkpoint_bytes = with_ondemand_download(|| self.timeline.get_checkpoint(self.lsn)) + let checkpoint_bytes = self + .timeline + .get_checkpoint(self.lsn) .await .context("failed to get checkpoint bytes")?; - let pg_control_bytes = with_ondemand_download(|| self.timeline.get_control_file(self.lsn)) + let pg_control_bytes = self + .timeline + .get_control_file(self.lsn) .await .context("failed get control bytes")?; diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 1c5eacd362..f522b2b949 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -13,7 +13,7 @@ use super::models::{ }; use crate::pgdatadir_mapping::LsnForTimestamp; use crate::tenant::config::TenantConfOpt; -use crate::tenant::{with_ondemand_download, Timeline}; +use crate::tenant::{PageReconstructError, Timeline}; use crate::{config::PageServerConf, tenant::mgr}; use utils::{ auth::JwtAuth, @@ -77,6 +77,15 @@ fn check_permission(request: &Request, tenant_id: Option) -> Res }) } +fn apierror_from_prerror(err: PageReconstructError) -> ApiError { + match err { + PageReconstructError::Other(err) => ApiError::InternalServerError(err), + PageReconstructError::WalRedo(err) => { + ApiError::InternalServerError(anyhow::Error::new(err)) + } + } +} + // Helper function to construct a TimelineInfo struct for a timeline async fn build_timeline_info( timeline: &Arc, @@ -298,9 +307,10 @@ async fn get_lsn_by_timestamp_handler(request: Request) -> Result format!("{lsn}"), diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index ca1514dd00..8c9c7120c0 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -143,7 +143,7 @@ async fn import_rel( // Call put_rel_creation for every segment of the relation, // because there is no guarantee about the order in which we are processing segments. // ignore "relation already exists" error - if let Err(e) = modification.put_rel_creation(rel, nblocks as u32) { + if let Err(e) = modification.put_rel_creation(rel, nblocks as u32).await { if e.to_string().contains("already exists") { debug!("relation {} already exists. we must be extending it", rel); } else { @@ -178,7 +178,7 @@ async fn import_rel( // // If we process rel segments out of order, // put_rel_extend will skip the update. - modification.put_rel_extend(rel, blknum)?; + modification.put_rel_extend(rel, blknum).await?; Ok(()) } @@ -206,7 +206,9 @@ async fn import_slru( ensure!(nblocks <= pg_constants::SLRU_PAGES_PER_SEGMENT as usize); - modification.put_slru_segment_creation(slru, segno, nblocks as u32)?; + modification + .put_slru_segment_creation(slru, segno, nblocks as u32) + .await?; let mut rpageno = 0; loop { @@ -492,7 +494,7 @@ async fn import_file( } "pg_filenode.map" => { let bytes = read_all_bytes(reader).await?; - modification.put_relmap_file(spcnode, dbnode, bytes)?; + modification.put_relmap_file(spcnode, dbnode, bytes).await?; debug!("imported relmap file") } "PG_VERSION" => { @@ -515,7 +517,7 @@ async fn import_file( match file_name.as_ref() { "pg_filenode.map" => { let bytes = read_all_bytes(reader).await?; - modification.put_relmap_file(spcnode, dbnode, bytes)?; + modification.put_relmap_file(spcnode, dbnode, bytes).await?; debug!("imported relmap file") } "PG_VERSION" => { @@ -545,7 +547,9 @@ async fn import_file( let xid = u32::from_str_radix(file_name.as_ref(), 16)?; let bytes = read_all_bytes(reader).await?; - modification.put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]))?; + modification + .put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..])) + .await?; debug!("imported twophase file"); } else if file_path.starts_with("pg_wal") { debug!("found wal file in base section. ignore it"); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index b266a07337..ca69bf04a7 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -546,10 +546,7 @@ impl PageServerHandler { let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn) .await?; - let exists = crate::tenant::with_ondemand_download(|| { - timeline.get_rel_exists(req.rel, lsn, req.latest) - }) - .await?; + let exists = timeline.get_rel_exists(req.rel, lsn, req.latest).await?; Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse { exists, @@ -566,10 +563,7 @@ impl PageServerHandler { let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn) .await?; - let n_blocks = crate::tenant::with_ondemand_download(|| { - timeline.get_rel_size(req.rel, lsn, req.latest) - }) - .await?; + let n_blocks = timeline.get_rel_size(req.rel, lsn, req.latest).await?; Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse { n_blocks, @@ -586,10 +580,9 @@ impl PageServerHandler { let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn) .await?; - let total_blocks = crate::tenant::with_ondemand_download(|| { - timeline.get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, lsn, req.latest) - }) - .await?; + let total_blocks = timeline + .get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, lsn, req.latest) + .await?; let db_size = total_blocks as i64 * BLCKSZ as i64; Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse { @@ -615,10 +608,9 @@ impl PageServerHandler { } */ - let page = crate::tenant::with_ondemand_download(|| { - timeline.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest) - }) - .await?; + let page = timeline + .get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest) + .await?; Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse { page, diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 82b1576145..6ae70e3a30 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -6,11 +6,10 @@ //! walingest.rs handles a few things like implicit relation creation and extension. //! Clarify that) //! -use super::tenant::PageReconstructResult; +use super::tenant::{PageReconstructError, Timeline}; use crate::keyspace::{KeySpace, KeySpaceAccum}; -use crate::tenant::{with_ondemand_download, Timeline}; +use crate::repository::*; use crate::walrecord::NeonWalRecord; -use crate::{repository::*, try_no_ondemand_download}; use anyhow::Context; use bytes::{Buf, Bytes}; use pageserver_api::reltag::{RelTag, SlruKind}; @@ -92,76 +91,80 @@ impl Timeline { //------------------------------------------------------------------------------ /// Look up given page version. - pub fn get_rel_page_at_lsn( + pub async fn get_rel_page_at_lsn( &self, tag: RelTag, blknum: BlockNumber, lsn: Lsn, latest: bool, - ) -> PageReconstructResult { + ) -> Result { if tag.relnode == 0 { - return PageReconstructResult::from(anyhow::anyhow!("invalid relnode")); + return Err(PageReconstructError::Other(anyhow::anyhow!( + "invalid relnode" + ))); } - let nblocks = try_no_ondemand_download!(self.get_rel_size(tag, lsn, latest)); + let nblocks = self.get_rel_size(tag, lsn, latest).await?; if blknum >= nblocks { debug!( "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page", tag, blknum, lsn, nblocks ); - return PageReconstructResult::Success(ZERO_PAGE.clone()); + return Ok(ZERO_PAGE.clone()); } let key = rel_block_to_key(tag, blknum); - self.get(key, lsn) + self.get(key, lsn).await } // Get size of a database in blocks - pub fn get_db_size( + pub async fn get_db_size( &self, spcnode: Oid, dbnode: Oid, lsn: Lsn, latest: bool, - ) -> PageReconstructResult { + ) -> Result { let mut total_blocks = 0; - let rels = try_no_ondemand_download!(self.list_rels(spcnode, dbnode, lsn)); + let rels = self.list_rels(spcnode, dbnode, lsn).await?; for rel in rels { - let n_blocks = try_no_ondemand_download!(self.get_rel_size(rel, lsn, latest)); + let n_blocks = self.get_rel_size(rel, lsn, latest).await?; total_blocks += n_blocks as usize; } - PageReconstructResult::Success(total_blocks) + Ok(total_blocks) } /// Get size of a relation file - pub fn get_rel_size( + pub async fn get_rel_size( &self, tag: RelTag, lsn: Lsn, latest: bool, - ) -> PageReconstructResult { + ) -> Result { if tag.relnode == 0 { - return PageReconstructResult::from(anyhow::anyhow!("invalid relnode")); + return Err(PageReconstructError::Other(anyhow::anyhow!( + "invalid relnode" + ))); } if let Some(nblocks) = self.get_cached_rel_size(&tag, lsn) { - return PageReconstructResult::Success(nblocks); + return Ok(nblocks); } if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM) - && !try_no_ondemand_download!(self.get_rel_exists(tag, lsn, latest)) + && !self.get_rel_exists(tag, lsn, latest).await? { // FIXME: Postgres sometimes calls smgrcreate() to create // FSM, and smgrnblocks() on it immediately afterwards, // without extending it. Tolerate that by claiming that // any non-existent FSM fork has size 0. - return PageReconstructResult::Success(0); + return Ok(0); } let key = rel_size_to_key(tag); - let mut buf = try_no_ondemand_download!(self.get(key, lsn)); + let mut buf = self.get(key, lsn).await?; let nblocks = buf.get_u32_le(); if latest { @@ -174,47 +177,49 @@ impl Timeline { // associated with most recent value of LSN. self.update_cached_rel_size(tag, lsn, nblocks); } - PageReconstructResult::Success(nblocks) + Ok(nblocks) } /// Does relation exist? - pub fn get_rel_exists( + pub async fn get_rel_exists( &self, tag: RelTag, lsn: Lsn, _latest: bool, - ) -> PageReconstructResult { + ) -> Result { if tag.relnode == 0 { - return PageReconstructResult::from(anyhow::anyhow!("invalid relnode")); + return Err(PageReconstructError::Other(anyhow::anyhow!( + "invalid relnode" + ))); } // first try to lookup relation in cache if let Some(_nblocks) = self.get_cached_rel_size(&tag, lsn) { - return PageReconstructResult::Success(true); + return Ok(true); } // fetch directory listing let key = rel_dir_to_key(tag.spcnode, tag.dbnode); - let buf = try_no_ondemand_download!(self.get(key, lsn)); + let buf = self.get(key, lsn).await?; match RelDirectory::des(&buf).context("deserialization failure") { Ok(dir) => { let exists = dir.rels.get(&(tag.relnode, tag.forknum)).is_some(); - PageReconstructResult::Success(exists) + Ok(exists) } - Err(e) => PageReconstructResult::from(e), + Err(e) => Err(PageReconstructError::from(e)), } } /// Get a list of all existing relations in given tablespace and database. - pub fn list_rels( + pub async fn list_rels( &self, spcnode: Oid, dbnode: Oid, lsn: Lsn, - ) -> PageReconstructResult> { + ) -> Result, PageReconstructError> { // fetch directory listing let key = rel_dir_to_key(spcnode, dbnode); - let buf = try_no_ondemand_download!(self.get(key, lsn)); + let buf = self.get(key, lsn).await?; match RelDirectory::des(&buf).context("deserialization failure") { Ok(dir) => { @@ -226,53 +231,53 @@ impl Timeline { forknum: *forknum, })); - PageReconstructResult::Success(rels) + Ok(rels) } - Err(e) => PageReconstructResult::from(e), + Err(e) => Err(PageReconstructError::from(e)), } } /// Look up given SLRU page version. - pub fn get_slru_page_at_lsn( + pub async fn get_slru_page_at_lsn( &self, kind: SlruKind, segno: u32, blknum: BlockNumber, lsn: Lsn, - ) -> PageReconstructResult { + ) -> Result { let key = slru_block_to_key(kind, segno, blknum); - self.get(key, lsn) + self.get(key, lsn).await } /// Get size of an SLRU segment - pub fn get_slru_segment_size( + pub async fn get_slru_segment_size( &self, kind: SlruKind, segno: u32, lsn: Lsn, - ) -> PageReconstructResult { + ) -> Result { let key = slru_segment_size_to_key(kind, segno); - let mut buf = try_no_ondemand_download!(self.get(key, lsn)); - PageReconstructResult::Success(buf.get_u32_le()) + let mut buf = self.get(key, lsn).await?; + Ok(buf.get_u32_le()) } /// Get size of an SLRU segment - pub fn get_slru_segment_exists( + pub async fn get_slru_segment_exists( &self, kind: SlruKind, segno: u32, lsn: Lsn, - ) -> PageReconstructResult { + ) -> Result { // fetch directory listing let key = slru_dir_to_key(kind); - let buf = try_no_ondemand_download!(self.get(key, lsn)); + let buf = self.get(key, lsn).await?; match SlruSegmentDirectory::des(&buf).context("deserialization failure") { Ok(dir) => { let exists = dir.segments.get(&segno).is_some(); - PageReconstructResult::Success(exists) + Ok(exists) } - Err(e) => PageReconstructResult::from(e), + Err(e) => Err(PageReconstructError::from(e)), } } @@ -283,10 +288,10 @@ impl Timeline { /// so it's not well defined which LSN you get if there were multiple commits /// "in flight" at that point in time. /// - pub fn find_lsn_for_timestamp( + pub async fn find_lsn_for_timestamp( &self, search_timestamp: TimestampTz, - ) -> PageReconstructResult { + ) -> Result { let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn(); let min_lsn = *gc_cutoff_lsn_guard; let max_lsn = self.get_last_record_lsn(); @@ -302,12 +307,14 @@ impl Timeline { // cannot overflow, high and low are both smaller than u64::MAX / 2 let mid = (high + low) / 2; - let cmp = try_no_ondemand_download!(self.is_latest_commit_timestamp_ge_than( - search_timestamp, - Lsn(mid * 8), - &mut found_smaller, - &mut found_larger, - )); + let cmp = self + .is_latest_commit_timestamp_ge_than( + search_timestamp, + Lsn(mid * 8), + &mut found_smaller, + &mut found_larger, + ) + .await?; if cmp { high = mid; @@ -319,15 +326,15 @@ impl Timeline { (false, false) => { // This can happen if no commit records have been processed yet, e.g. // just after importing a cluster. - PageReconstructResult::Success(LsnForTimestamp::NoData(max_lsn)) + Ok(LsnForTimestamp::NoData(max_lsn)) } (true, false) => { // Didn't find any commit timestamps larger than the request - PageReconstructResult::Success(LsnForTimestamp::Future(max_lsn)) + Ok(LsnForTimestamp::Future(max_lsn)) } (false, true) => { // Didn't find any commit timestamps smaller than the request - PageReconstructResult::Success(LsnForTimestamp::Past(max_lsn)) + Ok(LsnForTimestamp::Past(max_lsn)) } (true, true) => { // low is the LSN of the first commit record *after* the search_timestamp, @@ -337,7 +344,7 @@ impl Timeline { // Otherwise, if you restore to the returned LSN, the database will // include physical changes from later commits that will be marked // as aborted, and will need to be vacuumed away. - PageReconstructResult::Success(LsnForTimestamp::Present(Lsn((low - 1) * 8))) + Ok(LsnForTimestamp::Present(Lsn((low - 1) * 8))) } } } @@ -349,26 +356,21 @@ impl Timeline { /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits /// with a smaller/larger timestamp. /// - pub fn is_latest_commit_timestamp_ge_than( + pub async fn is_latest_commit_timestamp_ge_than( &self, search_timestamp: TimestampTz, probe_lsn: Lsn, found_smaller: &mut bool, found_larger: &mut bool, - ) -> PageReconstructResult { - for segno in try_no_ondemand_download!(self.list_slru_segments(SlruKind::Clog, probe_lsn)) { - let nblocks = try_no_ondemand_download!(self.get_slru_segment_size( - SlruKind::Clog, - segno, - probe_lsn - )); + ) -> Result { + for segno in self.list_slru_segments(SlruKind::Clog, probe_lsn).await? { + let nblocks = self + .get_slru_segment_size(SlruKind::Clog, segno, probe_lsn) + .await?; for blknum in (0..nblocks).rev() { - let clog_page = try_no_ondemand_download!(self.get_slru_page_at_lsn( - SlruKind::Clog, - segno, - blknum, - probe_lsn - )); + let clog_page = self + .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn) + .await?; if clog_page.len() == BLCKSZ as usize + 8 { let mut timestamp_bytes = [0u8; 8]; @@ -377,76 +379,85 @@ impl Timeline { if timestamp >= search_timestamp { *found_larger = true; - return PageReconstructResult::Success(true); + return Ok(true); } else { *found_smaller = true; } } } } - PageReconstructResult::Success(false) + Ok(false) } /// Get a list of SLRU segments - pub fn list_slru_segments( + pub async fn list_slru_segments( &self, kind: SlruKind, lsn: Lsn, - ) -> PageReconstructResult> { + ) -> Result, PageReconstructError> { // fetch directory entry let key = slru_dir_to_key(kind); - let buf = try_no_ondemand_download!(self.get(key, lsn)); + let buf = self.get(key, lsn).await?; match SlruSegmentDirectory::des(&buf).context("deserialization failure") { - Ok(dir) => PageReconstructResult::Success(dir.segments), - Err(e) => PageReconstructResult::from(e), + Ok(dir) => Ok(dir.segments), + Err(e) => Err(PageReconstructError::from(e)), } } - pub fn get_relmap_file( + pub async fn get_relmap_file( &self, spcnode: Oid, dbnode: Oid, lsn: Lsn, - ) -> PageReconstructResult { + ) -> Result { let key = relmap_file_key(spcnode, dbnode); - let buf = try_no_ondemand_download!(self.get(key, lsn)); - PageReconstructResult::Success(buf) + self.get(key, lsn).await } - pub fn list_dbdirs(&self, lsn: Lsn) -> PageReconstructResult> { + pub async fn list_dbdirs( + &self, + lsn: Lsn, + ) -> Result, PageReconstructError> { // fetch directory entry - let buf = try_no_ondemand_download!(self.get(DBDIR_KEY, lsn)); + let buf = self.get(DBDIR_KEY, lsn).await?; match DbDirectory::des(&buf).context("deserialization failure") { - Ok(dir) => PageReconstructResult::Success(dir.dbdirs), - Err(e) => PageReconstructResult::from(e), + Ok(dir) => Ok(dir.dbdirs), + Err(e) => Err(PageReconstructError::from(e)), } } - pub fn get_twophase_file(&self, xid: TransactionId, lsn: Lsn) -> PageReconstructResult { + pub async fn get_twophase_file( + &self, + xid: TransactionId, + lsn: Lsn, + ) -> Result { let key = twophase_file_key(xid); - let buf = try_no_ondemand_download!(self.get(key, lsn)); - PageReconstructResult::Success(buf) + let buf = self.get(key, lsn).await?; + Ok(buf) } - pub fn list_twophase_files(&self, lsn: Lsn) -> PageReconstructResult> { + pub async fn list_twophase_files( + &self, + lsn: Lsn, + ) -> Result, PageReconstructError> { // fetch directory entry - let buf = try_no_ondemand_download!(self.get(TWOPHASEDIR_KEY, lsn)); + let buf = self.get(TWOPHASEDIR_KEY, lsn).await?; match TwoPhaseDirectory::des(&buf).context("deserialization failure") { - Ok(dir) => PageReconstructResult::Success(dir.xids), - Err(e) => PageReconstructResult::from(e), + Ok(dir) => Ok(dir.xids), + Err(e) => Err(PageReconstructError::from(e)), } } - pub fn get_control_file(&self, lsn: Lsn) -> PageReconstructResult { - self.get(CONTROLFILE_KEY, lsn) + pub async fn get_control_file(&self, lsn: Lsn) -> Result { + self.get(CONTROLFILE_KEY, lsn).await } - pub fn get_checkpoint(&self, lsn: Lsn) -> PageReconstructResult { - self.get(CHECKPOINT_KEY, lsn) + pub async fn get_checkpoint(&self, lsn: Lsn) -> Result { + self.get(CHECKPOINT_KEY, lsn).await } /// Does the same as get_current_logical_size but counted on demand. @@ -460,20 +471,24 @@ impl Timeline { cancel: CancellationToken, ) -> Result { // Fetch list of database dirs and iterate them - let buf = self.get_download(DBDIR_KEY, lsn).await?; + let buf = self.get(DBDIR_KEY, lsn).await.context("read dbdir")?; let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?; let mut total_size: u64 = 0; for (spcnode, dbnode) in dbdir.dbdirs.keys() { - for rel in - crate::tenant::with_ondemand_download(|| self.list_rels(*spcnode, *dbnode, lsn)) - .await? + for rel in self + .list_rels(*spcnode, *dbnode, lsn) + .await + .context("list rels")? { if cancel.is_cancelled() { return Err(CalculateLogicalSizeError::Cancelled); } let relsize_key = rel_size_to_key(rel); - let mut buf = self.get_download(relsize_key, lsn).await?; + let mut buf = self + .get(relsize_key, lsn) + .await + .context("read relation size of {rel:?}")?; let relsize = buf.get_u32_le(); total_size += relsize as u64; @@ -494,7 +509,7 @@ impl Timeline { result.add_key(DBDIR_KEY); // Fetch list of database dirs and iterate them - let buf = self.get_download(DBDIR_KEY, lsn).await?; + let buf = self.get(DBDIR_KEY, lsn).await?; let dbdir = DbDirectory::des(&buf).context("deserialization failure")?; let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect(); @@ -503,15 +518,15 @@ impl Timeline { result.add_key(relmap_file_key(spcnode, dbnode)); result.add_key(rel_dir_to_key(spcnode, dbnode)); - let mut rels: Vec = - with_ondemand_download(|| self.list_rels(spcnode, dbnode, lsn)) - .await? - .into_iter() - .collect(); + let mut rels: Vec = self + .list_rels(spcnode, dbnode, lsn) + .await? + .into_iter() + .collect(); rels.sort_unstable(); for rel in rels { let relsize_key = rel_size_to_key(rel); - let mut buf = self.get_download(relsize_key, lsn).await?; + let mut buf = self.get(relsize_key, lsn).await?; let relsize = buf.get_u32_le(); result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize)); @@ -527,13 +542,13 @@ impl Timeline { ] { let slrudir_key = slru_dir_to_key(kind); result.add_key(slrudir_key); - let buf = self.get_download(slrudir_key, lsn).await?; + let buf = self.get(slrudir_key, lsn).await?; let dir = SlruSegmentDirectory::des(&buf).context("deserialization failure")?; let mut segments: Vec = dir.segments.iter().cloned().collect(); segments.sort_unstable(); for segno in segments { let segsize_key = slru_segment_size_to_key(kind, segno); - let mut buf = self.get_download(segsize_key, lsn).await?; + let mut buf = self.get(segsize_key, lsn).await?; let segsize = buf.get_u32_le(); result.add_range( @@ -545,7 +560,7 @@ impl Timeline { // Then pg_twophase result.add_key(TWOPHASEDIR_KEY); - let buf = self.get_download(TWOPHASEDIR_KEY, lsn).await?; + let buf = self.get(TWOPHASEDIR_KEY, lsn).await?; let twophase_dir = TwoPhaseDirectory::des(&buf).context("deserialization failure")?; let mut xids: Vec = twophase_dir.xids.iter().cloned().collect(); xids.sort_unstable(); @@ -703,9 +718,14 @@ impl<'a> DatadirModification<'a> { } /// Store a relmapper file (pg_filenode.map) in the repository - pub fn put_relmap_file(&mut self, spcnode: Oid, dbnode: Oid, img: Bytes) -> anyhow::Result<()> { + pub async fn put_relmap_file( + &mut self, + spcnode: Oid, + dbnode: Oid, + img: Bytes, + ) -> anyhow::Result<()> { // Add it to the directory (if it doesn't exist already) - let buf = self.get(DBDIR_KEY).no_ondemand_download()?; + let buf = self.get(DBDIR_KEY).await?; let mut dbdir = DbDirectory::des(&buf)?; let r = dbdir.dbdirs.insert((spcnode, dbnode), true); @@ -731,9 +751,13 @@ impl<'a> DatadirModification<'a> { Ok(()) } - pub fn put_twophase_file(&mut self, xid: TransactionId, img: Bytes) -> anyhow::Result<()> { + pub async fn put_twophase_file( + &mut self, + xid: TransactionId, + img: Bytes, + ) -> anyhow::Result<()> { // Add it to the directory entry - let buf = self.get(TWOPHASEDIR_KEY).no_ondemand_download()?; + let buf = self.get(TWOPHASEDIR_KEY).await?; let mut dir = TwoPhaseDirectory::des(&buf)?; if !dir.xids.insert(xid) { anyhow::bail!("twophase file for xid {} already exists", xid); @@ -757,16 +781,16 @@ impl<'a> DatadirModification<'a> { Ok(()) } - pub fn drop_dbdir(&mut self, spcnode: Oid, dbnode: Oid) -> anyhow::Result<()> { + pub async fn drop_dbdir(&mut self, spcnode: Oid, dbnode: Oid) -> anyhow::Result<()> { let req_lsn = self.tline.get_last_record_lsn(); let total_blocks = self .tline .get_db_size(spcnode, dbnode, req_lsn, true) - .no_ondemand_download()?; + .await?; // Remove entry from dbdir - let buf = self.get(DBDIR_KEY).no_ondemand_download()?; + let buf = self.get(DBDIR_KEY).await?; let mut dir = DbDirectory::des(&buf)?; if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() { let buf = DbDirectory::ser(&dir)?; @@ -789,11 +813,15 @@ impl<'a> DatadirModification<'a> { /// Create a relation fork. /// /// 'nblocks' is the initial size. - pub fn put_rel_creation(&mut self, rel: RelTag, nblocks: BlockNumber) -> anyhow::Result<()> { + pub async fn put_rel_creation( + &mut self, + rel: RelTag, + nblocks: BlockNumber, + ) -> anyhow::Result<()> { anyhow::ensure!(rel.relnode != 0, "invalid relnode"); // It's possible that this is the first rel for this db in this // tablespace. Create the reldir entry for it if so. - let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY).no_ondemand_download()?)?; + let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY).await?)?; let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode); let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() { // Didn't exist. Update dbdir @@ -805,7 +833,7 @@ impl<'a> DatadirModification<'a> { RelDirectory::default() } else { // reldir already exists, fetch it - RelDirectory::des(&self.get(rel_dir_key).no_ondemand_download()?)? + RelDirectory::des(&self.get(rel_dir_key).await?)? }; // Add the new relation to the rel directory entry, and write it back @@ -833,17 +861,17 @@ impl<'a> DatadirModification<'a> { } /// Truncate relation - pub fn put_rel_truncation(&mut self, rel: RelTag, nblocks: BlockNumber) -> anyhow::Result<()> { + pub async fn put_rel_truncation( + &mut self, + rel: RelTag, + nblocks: BlockNumber, + ) -> anyhow::Result<()> { anyhow::ensure!(rel.relnode != 0, "invalid relnode"); let last_lsn = self.tline.get_last_record_lsn(); - if self - .tline - .get_rel_exists(rel, last_lsn, true) - .no_ondemand_download()? - { + if self.tline.get_rel_exists(rel, last_lsn, true).await? { let size_key = rel_size_to_key(rel); // Fetch the old size first - let old_size = self.get(size_key).no_ondemand_download()?.get_u32_le(); + let old_size = self.get(size_key).await?.get_u32_le(); // Update the entry with the new size. let buf = nblocks.to_le_bytes(); @@ -863,12 +891,16 @@ impl<'a> DatadirModification<'a> { /// Extend relation /// If new size is smaller, do nothing. - pub fn put_rel_extend(&mut self, rel: RelTag, nblocks: BlockNumber) -> anyhow::Result<()> { + pub async fn put_rel_extend( + &mut self, + rel: RelTag, + nblocks: BlockNumber, + ) -> anyhow::Result<()> { anyhow::ensure!(rel.relnode != 0, "invalid relnode"); // Put size let size_key = rel_size_to_key(rel); - let old_size = self.get(size_key).no_ondemand_download()?.get_u32_le(); + let old_size = self.get(size_key).await?.get_u32_le(); // only extend relation here. never decrease the size if nblocks > old_size { @@ -884,12 +916,12 @@ impl<'a> DatadirModification<'a> { } /// Drop a relation. - pub fn put_rel_drop(&mut self, rel: RelTag) -> anyhow::Result<()> { + pub async fn put_rel_drop(&mut self, rel: RelTag) -> anyhow::Result<()> { anyhow::ensure!(rel.relnode != 0, "invalid relnode"); // Remove it from the directory entry let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode); - let buf = self.get(dir_key).no_ondemand_download()?; + let buf = self.get(dir_key).await?; let mut dir = RelDirectory::des(&buf)?; if dir.rels.remove(&(rel.relnode, rel.forknum)) { @@ -900,7 +932,7 @@ impl<'a> DatadirModification<'a> { // update logical size let size_key = rel_size_to_key(rel); - let old_size = self.get(size_key).no_ondemand_download()?.get_u32_le(); + let old_size = self.get(size_key).await?.get_u32_le(); self.pending_nblocks -= old_size as i64; // Remove enty from relation size cache @@ -912,7 +944,7 @@ impl<'a> DatadirModification<'a> { Ok(()) } - pub fn put_slru_segment_creation( + pub async fn put_slru_segment_creation( &mut self, kind: SlruKind, segno: u32, @@ -920,7 +952,7 @@ impl<'a> DatadirModification<'a> { ) -> anyhow::Result<()> { // Add it to the directory entry let dir_key = slru_dir_to_key(kind); - let buf = self.get(dir_key).no_ondemand_download()?; + let buf = self.get(dir_key).await?; let mut dir = SlruSegmentDirectory::des(&buf)?; if !dir.segments.insert(segno) { @@ -956,10 +988,10 @@ impl<'a> DatadirModification<'a> { } /// This method is used for marking truncated SLRU files - pub fn drop_slru_segment(&mut self, kind: SlruKind, segno: u32) -> anyhow::Result<()> { + pub async fn drop_slru_segment(&mut self, kind: SlruKind, segno: u32) -> anyhow::Result<()> { // Remove it from the directory entry let dir_key = slru_dir_to_key(kind); - let buf = self.get(dir_key).no_ondemand_download()?; + let buf = self.get(dir_key).await?; let mut dir = SlruSegmentDirectory::des(&buf)?; if !dir.segments.remove(&segno) { @@ -983,9 +1015,9 @@ impl<'a> DatadirModification<'a> { } /// This method is used for marking truncated SLRU files - pub fn drop_twophase_file(&mut self, xid: TransactionId) -> anyhow::Result<()> { + pub async fn drop_twophase_file(&mut self, xid: TransactionId) -> anyhow::Result<()> { // Remove it from the directory entry - let buf = self.get(TWOPHASEDIR_KEY).no_ondemand_download()?; + let buf = self.get(TWOPHASEDIR_KEY).await?; let mut dir = TwoPhaseDirectory::des(&buf)?; if !dir.xids.remove(&xid) { @@ -1079,7 +1111,7 @@ impl<'a> DatadirModification<'a> { // Internal helper functions to batch the modifications - fn get(&self, key: Key) -> PageReconstructResult { + async fn get(&self, key: Key) -> Result { // Have we already updated the same key? Read the pending updated // version in that case. // @@ -1087,18 +1119,20 @@ impl<'a> DatadirModification<'a> { // value that has been removed, deletion only avoids leaking storage. if let Some(value) = self.pending_updates.get(&key) { if let Value::Image(img) = value { - PageReconstructResult::Success(img.clone()) + Ok(img.clone()) } else { // Currently, we never need to read back a WAL record that we // inserted in the same "transaction". All the metadata updates // work directly with Images, and we never need to read actual // data pages. We could handle this if we had to, by calling // the walredo manager, but let's keep it simple for now. - PageReconstructResult::from(anyhow::anyhow!("unexpected pending WAL record")) + Err(PageReconstructError::from(anyhow::anyhow!( + "unexpected pending WAL record" + ))) } } else { let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn); - self.tline.get(key, lsn) + self.tline.get(key, lsn).await } } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index d74f263f08..d1d1efac87 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -92,7 +92,7 @@ mod timeline; pub mod size; -pub use timeline::{with_ondemand_download, PageReconstructError, PageReconstructResult, Timeline}; +pub use timeline::{PageReconstructError, Timeline}; // re-export this function so that page_cache.rs can use it. pub use crate::tenant::ephemeral_file::writeback as writeback_ephemeral_file; @@ -2816,15 +2816,15 @@ mod tests { drop(writer); assert_eq!( - tline.get(*TEST_KEY, Lsn(0x10)).no_ondemand_download()?, + tline.get(*TEST_KEY, Lsn(0x10)).await?, TEST_IMG("foo at 0x10") ); assert_eq!( - tline.get(*TEST_KEY, Lsn(0x1f)).no_ondemand_download()?, + tline.get(*TEST_KEY, Lsn(0x1f)).await?, TEST_IMG("foo at 0x10") ); assert_eq!( - tline.get(*TEST_KEY, Lsn(0x20)).no_ondemand_download()?, + tline.get(*TEST_KEY, Lsn(0x20)).await?, TEST_IMG("foo at 0x20") ); @@ -2903,15 +2903,15 @@ mod tests { // Check page contents on both branches assert_eq!( - from_utf8(&tline.get(TEST_KEY_A, Lsn(0x40)).no_ondemand_download()?)?, + from_utf8(&tline.get(TEST_KEY_A, Lsn(0x40)).await?)?, "foo at 0x40" ); assert_eq!( - from_utf8(&newtline.get(TEST_KEY_A, Lsn(0x40)).no_ondemand_download()?)?, + from_utf8(&newtline.get(TEST_KEY_A, Lsn(0x40)).await?)?, "bar at 0x40" ); assert_eq!( - from_utf8(&newtline.get(TEST_KEY_B, Lsn(0x40)).no_ondemand_download()?)?, + from_utf8(&newtline.get(TEST_KEY_B, Lsn(0x40)).await?)?, "foobar at 0x20" ); @@ -3070,10 +3070,7 @@ mod tests { tenant .gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO) .await?; - assert!(newtline - .get(*TEST_KEY, Lsn(0x25)) - .no_ondemand_download() - .is_ok()); + assert!(newtline.get(*TEST_KEY, Lsn(0x25)).await.is_ok()); Ok(()) } @@ -3103,7 +3100,7 @@ mod tests { // Check that the data is still accessible on the branch. assert_eq!( - newtline.get(*TEST_KEY, Lsn(0x50)).no_ondemand_download()?, + newtline.get(*TEST_KEY, Lsn(0x50)).await?, TEST_IMG(&format!("foo at {}", Lsn(0x40))) ); @@ -3251,23 +3248,23 @@ mod tests { tline.compact().await?; assert_eq!( - tline.get(*TEST_KEY, Lsn(0x10)).no_ondemand_download()?, + tline.get(*TEST_KEY, Lsn(0x10)).await?, TEST_IMG("foo at 0x10") ); assert_eq!( - tline.get(*TEST_KEY, Lsn(0x1f)).no_ondemand_download()?, + tline.get(*TEST_KEY, Lsn(0x1f)).await?, TEST_IMG("foo at 0x10") ); assert_eq!( - tline.get(*TEST_KEY, Lsn(0x20)).no_ondemand_download()?, + tline.get(*TEST_KEY, Lsn(0x20)).await?, TEST_IMG("foo at 0x20") ); assert_eq!( - tline.get(*TEST_KEY, Lsn(0x30)).no_ondemand_download()?, + tline.get(*TEST_KEY, Lsn(0x30)).await?, TEST_IMG("foo at 0x30") ); assert_eq!( - tline.get(*TEST_KEY, Lsn(0x40)).no_ondemand_download()?, + tline.get(*TEST_KEY, Lsn(0x40)).await?, TEST_IMG("foo at 0x40") ); @@ -3377,7 +3374,7 @@ mod tests { for (blknum, last_lsn) in updated.iter().enumerate() { test_key.field6 = blknum as u32; assert_eq!( - tline.get(test_key, lsn).no_ondemand_download()?, + tline.get(test_key, lsn).await?, TEST_IMG(&format!("{} at {}", blknum, last_lsn)) ); } @@ -3463,7 +3460,7 @@ mod tests { for (blknum, last_lsn) in updated.iter().enumerate() { test_key.field6 = blknum as u32; assert_eq!( - tline.get(test_key, lsn).no_ondemand_download()?, + tline.get(test_key, lsn).await?, TEST_IMG(&format!("{} at {}", blknum, last_lsn)) ); } @@ -3538,7 +3535,7 @@ mod tests { println!("checking [{idx}][{blknum}] at {lsn}"); test_key.field6 = blknum as u32; assert_eq!( - tline.get(test_key, *lsn).no_ondemand_download()?, + tline.get(test_key, *lsn).await?, TEST_IMG(&format!("{idx} {blknum} at {lsn}")) ); } diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index d87a248bdf..6aee8ce23c 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -109,7 +109,7 @@ pub trait Layer: Send + Sync { /// See PageReconstructResult for possible return values. The collected data /// is appended to reconstruct_data; the caller should pass an empty struct /// on first call, or a struct with a cached older image of the page if one - /// is available. If this returns PageReconstructResult::Continue, look up + /// is available. If this returns ValueReconstructResult::Continue, look up /// the predecessor layer and call again with the same 'reconstruct_data' to /// collect more data. fn get_value_reconstruct_data( diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 0d8a5fc800..146b4191fd 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -335,43 +335,6 @@ pub struct WalReceiverInfo { pub last_received_msg_ts: u128, } -/// Like `?`, but for [`PageReconstructResult`]. -/// Use it to bubble up the `NeedsDownload` and `Error` to the caller. -/// -/// Once `std::ops::Try` is stabilized, we should use it instead of this macro. -#[macro_export] -macro_rules! try_no_ondemand_download { - ($result:expr) => {{ - let result = $result; - match result { - PageReconstructResult::Success(value) => value, - PageReconstructResult::NeedsDownload(timeline, layer) => { - return PageReconstructResult::NeedsDownload(timeline, layer); - } - PageReconstructResult::Error(e) => return PageReconstructResult::Error(e), - } - }}; -} - -/// Replacement for `?` in functions that return [`PageReconstructResult`]. -/// -/// Given an `expr: Result`, use `try_page_reconstruct_result!(expr)` -/// instead of `(expr)?`. -/// If `expr` is `Ok(v)`, the macro evaluates to `v`. -/// If `expr` is `Err(e)`, the macro returns `PageReconstructResult::Error(e.into())`. -/// -/// Once `std::ops::Try` is stabilized, we should use it instead of this macro. -#[macro_export] -macro_rules! try_page_reconstruct_result { - ($result:expr) => {{ - let result = $result; - match result { - Ok(v) => v, - Err(e) => return PageReconstructResult::from(e), - } - }}; -} - /// /// Information about how much history needs to be retained, needed by /// Garbage Collection. @@ -401,15 +364,6 @@ pub struct GcInfo { pub pitr_cutoff: Lsn, } -pub enum PageReconstructResult { - Success(T), - /// The given RemoteLayer needs to be downloaded and replaced in the timeline's layer map - /// for the operation to succeed. Use [`Timeline::download_remote_layer`] to do it, then - /// retry the operation that returned this error. - NeedsDownload(Weak, Weak), - Error(PageReconstructError), -} - /// An error happened in a get() operation. #[derive(thiserror::Error)] pub enum PageReconstructError { @@ -429,49 +383,6 @@ impl std::fmt::Debug for PageReconstructError { } } -/// This impl makes it so you can substitute return type -/// `Result` with `PageReconstructError` in functions -/// and existing `?` will generally continue to work. -/// The reason why thanks to -/// anyhow::Error that `(some error type)ensures that exis -impl From for PageReconstructResult -where - E: Into, -{ - fn from(e: E) -> Self { - Self::Error(e.into()) - } -} - -impl PageReconstructResult { - /// Treat the need for on-demand download as an error. - /// - /// **Avoid this function in new code** if you can help it, - /// as on-demand download will become the norm in the future, - /// especially once we implement layer file eviction. - /// - /// If you are in an async function, use [`with_ondemand_download`] - /// to do the download right here. - /// - /// If you are in a sync function, change its return type from - /// `Result` to `PageReconstructResult` and bubble up - /// the non-success cases of `PageReconstructResult` to the caller. - /// This gives them a chance to do the download and retry. - /// Consider using [`try_no_ondemand_download`] for convenience. - /// - /// For more background, read the comment on [`with_ondemand_download`]. - pub fn no_ondemand_download(self) -> anyhow::Result { - match self { - PageReconstructResult::Success(value) => Ok(value), - // TODO print more info about the timeline - PageReconstructResult::NeedsDownload(_, _) => anyhow::bail!("Layer needs downloading"), - PageReconstructResult::Error(e) => { - Err(anyhow::Error::new(e).context("Failed to reconstruct the page")) - } - } - } -} - /// Public interface functions impl Timeline { /// Get the LSN where this branch was created @@ -493,15 +404,19 @@ impl Timeline { /// Look up given page version. /// - /// NOTE: It is considered an error to 'get' a key that doesn't exist. The abstraction - /// above this needs to store suitable metadata to track what data exists with - /// what keys, in separate metadata entries. If a non-existent key is requested, - /// the Repository implementation may incorrectly return a value from an ancestor - /// branch, for example, or waste a lot of cycles chasing the non-existing key. + /// If a remote layer file is needed, it is downloaded as part of this + /// call. /// - pub fn get(&self, key: Key, lsn: Lsn) -> PageReconstructResult { + /// NOTE: It is considered an error to 'get' a key that doesn't exist. The + /// abstraction above this needs to store suitable metadata to track what + /// data exists with what keys, in separate metadata entries. If a + /// non-existent key is requested, we may incorrectly return a value from + /// an ancestor branch, for example, or waste a lot of cycles chasing the + /// non-existing key. + /// + pub async fn get(&self, key: Key, lsn: Lsn) -> Result { if !lsn.is_valid() { - return PageReconstructResult::from(anyhow!("Invalid LSN")); + return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN"))); } // Check the page cache. We will get back the most recent page with lsn <= `lsn`. @@ -512,7 +427,7 @@ impl Timeline { Some((cached_lsn, cached_img)) => { match cached_lsn.cmp(&lsn) { Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check - Ordering::Equal => return PageReconstructResult::Success(cached_img), // exact LSN match, return the image + Ordering::Equal => return Ok(cached_img), // exact LSN match, return the image Ordering::Greater => { unreachable!("the returned lsn should never be after the requested lsn") } @@ -527,18 +442,14 @@ impl Timeline { img: cached_page_img, }; - try_no_ondemand_download!(self.get_reconstruct_data(key, lsn, &mut reconstruct_state)); + self.get_reconstruct_data(key, lsn, &mut reconstruct_state) + .await?; self.metrics .reconstruct_time_histo .observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state)) } - // Like get(), but if a remote layer file is needed, it is downloaded as part of this call. - pub async fn get_download(&self, key: Key, lsn: Lsn) -> anyhow::Result { - with_ondemand_download(|| self.get(key, lsn)).await - } - /// Get last or prev record separately. Same as get_last_record_rlsn().last/prev. pub fn get_last_record_lsn(&self) -> Lsn { self.last_record_lsn.load().last @@ -1630,12 +1541,12 @@ impl Timeline { /// /// This function takes the current timeline's locked LayerMap as an argument, /// so callers can avoid potential race conditions. - fn get_reconstruct_data( + async fn get_reconstruct_data( &self, key: Key, request_lsn: Lsn, reconstruct_state: &mut ValueReconstructState, - ) -> PageReconstructResult<()> { + ) -> Result<(), PageReconstructError> { // Start from the current timeline. let mut timeline_owned; let mut timeline = self; @@ -1662,34 +1573,34 @@ impl Timeline { // The function should have updated 'state' //info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn); match result { - ValueReconstructResult::Complete => return PageReconstructResult::Success(()), + ValueReconstructResult::Complete => return Ok(()), ValueReconstructResult::Continue => { // If we reached an earlier cached page image, we're done. if cont_lsn == cached_lsn + 1 { self.metrics.materialized_page_cache_hit_counter.inc_by(1); - return PageReconstructResult::Success(()); + return Ok(()); } if prev_lsn <= cont_lsn { // Didn't make any progress in last iteration. Error out to avoid // getting stuck in the loop. - return layer_traversal_error(format!( + return Err(layer_traversal_error(format!( "could not find layer with more data for key {} at LSN {}, request LSN {}, ancestor {}", key, Lsn(cont_lsn.0 - 1), request_lsn, timeline.ancestor_lsn - ), traversal_path); + ), traversal_path)); } prev_lsn = cont_lsn; } ValueReconstructResult::Missing => { - return layer_traversal_error( + return Err(layer_traversal_error( format!( "could not find data for key {} at LSN {}, for request at LSN {}", key, cont_lsn, request_lsn ), traversal_path, - ); + )); } } @@ -1702,7 +1613,7 @@ impl Timeline { ); let ancestor = match timeline.get_ancestor_timeline() { Ok(timeline) => timeline, - Err(e) => return PageReconstructResult::from(e), + Err(e) => return Err(PageReconstructError::from(e)), }; timeline_owned = ancestor; timeline = &*timeline_owned; @@ -1711,7 +1622,7 @@ impl Timeline { } #[allow(clippy::never_loop)] // see comment at bottom of this loop - '_layer_map_search: loop { + 'layer_map_search: loop { let remote_layer = { let layers = timeline.layers.read().unwrap(); @@ -1730,7 +1641,7 @@ impl Timeline { reconstruct_state, ) { Ok(result) => result, - Err(e) => return PageReconstructResult::from(e), + Err(e) => return Err(PageReconstructError::from(e)), }; cont_lsn = lsn_floor; traversal_path.push(( @@ -1755,7 +1666,7 @@ impl Timeline { reconstruct_state, ) { Ok(result) => result, - Err(e) => return PageReconstructResult::from(e), + Err(e) => return Err(PageReconstructError::from(e)), }; cont_lsn = lsn_floor; traversal_path.push(( @@ -1788,7 +1699,7 @@ impl Timeline { reconstruct_state, ) { Ok(result) => result, - Err(e) => return PageReconstructResult::from(e), + Err(e) => return Err(PageReconstructError::from(e)), }; cont_lsn = lsn_floor; traversal_path.push(( @@ -1812,27 +1723,24 @@ impl Timeline { continue 'outer; } }; - // Indicate to the caller that we need remote_layer replaced with a downloaded - // layer in the layer map. The control flow could be a lot simpler, but the point - // of this commit is to prepare this function to - // 1. become async - // 2. do the download right here, using - // ``` - // download_remote_layer().await?; - // continue 'layer_map_search; - // ``` - // For (2), current rustc requires that the layers lock guard is not in scope. - // Hence, the complicated control flow. + // Download the remote_layer and replace it in the layer map. + // For that, we need to release the mutex. Otherwise, we'd deadlock. + // + // The control flow is so weird here because `drop(layers)` inside + // the if stmt above is not enough for current rustc: it requires + // that the layers lock guard is not in scope across the download + // await point. let remote_layer_as_persistent: Arc = Arc::clone(&remote_layer) as Arc; - info!( - "need remote layer {}", - remote_layer_as_persistent.traversal_id() - ); - return PageReconstructResult::NeedsDownload( - Weak::clone(&timeline.myself), - Arc::downgrade(&remote_layer), - ); + let id = remote_layer_as_persistent.traversal_id(); + info!("need remote layer {id}"); + + // The next layer doesn't exist locally. Need to download it. + // (The control flow is a bit complicated here because we must drop the 'layers' + // lock before awaiting on the Future.) + info!("on-demand downloading remote layer {id}"); + timeline.download_remote_layer(remote_layer).await?; + continue 'layer_map_search; } } } @@ -2270,7 +2178,7 @@ impl Timeline { partitioning: &KeyPartitioning, lsn: Lsn, force: bool, - ) -> anyhow::Result> { + ) -> Result, PageReconstructError> { let timer = self.metrics.create_images_time_histo.start_timer(); let mut image_layers: Vec = Vec::new(); for partition in partitioning.parts.iter() { @@ -2286,13 +2194,15 @@ impl Timeline { )?; fail_point!("image-layer-writer-fail-before-finish", |_| { - anyhow::bail!("failpoint image-layer-writer-fail-before-finish"); + Err(PageReconstructError::Other(anyhow::anyhow!( + "failpoint image-layer-writer-fail-before-finish" + ))) }); for range in &partition.ranges { let mut key = range.start; while key < range.end { - let img = match self.get_download(key, lsn).await { + let img = match self.get(key, lsn).await { Ok(img) => img, Err(err) => { // If we fail to reconstruct a VM or FSM page, we can zero the @@ -2343,7 +2253,7 @@ impl Timeline { self.conf.timeline_path(&self.timeline_id, &self.tenant_id), )) .collect::>(); - par_fsync::par_fsync(&all_paths)?; + par_fsync::par_fsync(&all_paths).context("fsync of newly created layer files")?; let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len()); @@ -2351,7 +2261,10 @@ impl Timeline { let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id); for l in image_layers { let path = l.filename(); - let metadata = timeline_path.join(path.file_name()).metadata()?; + let metadata = timeline_path + .join(path.file_name()) + .metadata() + .context("reading metadata of layer file {path}")?; layer_paths_to_upload.insert(path, LayerFileMetadata::new(metadata.len())); @@ -2752,8 +2665,7 @@ impl Timeline { if let Some(pitr_cutoff_timestamp) = now.checked_sub(pitr) { let pitr_timestamp = to_pg_timestamp(pitr_cutoff_timestamp); - match with_ondemand_download(|| self.find_lsn_for_timestamp(pitr_timestamp)).await? - { + match self.find_lsn_for_timestamp(pitr_timestamp).await? { LsnForTimestamp::Present(lsn) => lsn, LsnForTimestamp::Future(lsn) => { // The timestamp is in the future. That sounds impossible, @@ -3022,7 +2934,7 @@ impl Timeline { key: Key, request_lsn: Lsn, mut data: ValueReconstructState, - ) -> PageReconstructResult { + ) -> Result { // Perform WAL redo if needed data.records.reverse(); @@ -3034,11 +2946,11 @@ impl Timeline { key, img_lsn ); - PageReconstructResult::Success(img.clone()) + Ok(img.clone()) } else { - PageReconstructResult::from(anyhow!( + Err(PageReconstructError::from(anyhow!( "base image for {key} at {request_lsn} not found" - )) + ))) } } else { // We need to do WAL redo. @@ -3046,12 +2958,12 @@ impl Timeline { // If we don't have a base image, then the oldest WAL record better initialize // the page if data.img.is_none() && !data.records.first().unwrap().1.will_init() { - PageReconstructResult::from(anyhow!( + Err(PageReconstructError::from(anyhow!( "Base image for {} at {} not found, but got {} WAL records", key, request_lsn, data.records.len() - )) + ))) } else { if data.img.is_some() { trace!( @@ -3072,7 +2984,7 @@ impl Timeline { .context("Failed to reconstruct a page image:") { Ok(img) => img, - Err(e) => return PageReconstructResult::from(e), + Err(e) => return Err(PageReconstructError::from(e)), }; if img.len() == page_cache::PAGE_SZ { @@ -3087,11 +2999,11 @@ impl Timeline { ) .context("Materialized page memoization failed") { - return PageReconstructResult::from(e); + return Err(PageReconstructError::from(e)); } } - PageReconstructResult::Success(img) + Ok(img) } } } @@ -3117,7 +3029,7 @@ impl Timeline { /// So, the current download attempt will run to completion even if we stop polling. #[instrument(skip_all, fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%remote_layer.short_id()))] pub async fn download_remote_layer( - self: Arc, + &self, remote_layer: Arc, ) -> anyhow::Result<()> { let permit = match Arc::clone(&remote_layer.ongoing_download) @@ -3133,6 +3045,7 @@ impl Timeline { let (sender, receiver) = tokio::sync::oneshot::channel(); // Spawn a task so that download does not outlive timeline when we detach tenant / delete timeline. + let self_clone = self.myself.upgrade().expect("timeline is gone"); task_mgr::spawn( &tokio::runtime::Handle::current(), TaskKind::RemoteDownloadTask, @@ -3141,7 +3054,7 @@ impl Timeline { &format!("download layer {}", remote_layer.short_id()), false, async move { - let remote_client = self.remote_client.as_ref().unwrap(); + let remote_client = self_clone.remote_client.as_ref().unwrap(); // Does retries + exponential back-off internally. // When this fails, don't layer further retry attempts here. @@ -3152,12 +3065,12 @@ impl Timeline { if let Ok(size) = &result { // XXX the temp file is still around in Err() case // and consumes space until we clean up upon pageserver restart. - self.metrics.resident_physical_size_gauge.add(*size); + self_clone.metrics.resident_physical_size_gauge.add(*size); // Download complete. Replace the RemoteLayer with the corresponding // Delta- or ImageLayer in the layer map. - let new_layer = remote_layer.create_downloaded_layer(self.conf, *size); - let mut layers = self.layers.write().unwrap(); + let new_layer = remote_layer.create_downloaded_layer(self_clone.conf, *size); + let mut layers = self_clone.layers.write().unwrap(); { let l: Arc = remote_layer.clone(); layers.remove_historic(l); @@ -3254,12 +3167,7 @@ impl Timeline { layers .iter_historic_layers() .filter_map(|l| l.downcast_remote_layer()) - .map({ - |l| { - let self_clone = Arc::clone(self); - self_clone.download_remote_layer(l) - } - }) + .map(|l| self.download_remote_layer(l)) .collect() }; @@ -3321,101 +3229,15 @@ impl Timeline { } } -/// Helper function to deal with [`PageReconstructResult`]. -/// -/// Takes a sync closure that returns a [`PageReconstructResult`]. -/// If it is [`PageReconstructResult::NeedsDownload`], -/// do the download and retry the closure. -/// -/// ### Background -/// -/// This is a crutch to make on-demand downloads efficient in -/// our async-sync-async sandwich codebase. Some context: -/// -/// - The code that does the downloads uses async Rust. -/// - The code that initiates download is many levels of sync Rust. -/// - The sync code must wait for the download to finish to -/// make further progress. -/// - The sync code is invoked directly from async functions upstack. -/// -/// Example (there are also much worse ones where the sandwich is taller) -/// -/// async handle_get_page_at_lsn_request page_service.rs -/// sync get_rel_page_at_lsn timeline.rs -/// sync timeline.get timeline.rs -/// sync get_reconstruct_data timeline.rs -/// async download_remote_layer timeline.rs -/// -/// It is not possible to Timeline::download_remote_layer().await within -/// get_reconstruct_data, so instead, we return [`PageReconstructResult::NeedsDownload`] -/// which contains references to the [`Timeline`] and [`RemoteLayer`]. -/// We bubble that error upstack to the async code, which can then call -/// `Timeline::download_remote_layer().await`. -/// That is _efficient_ because tokio can use the same OS thread to do -/// other work while we're waiting for the download. -/// -/// It is a deliberate decision to use a new result type to communicate -/// the need for download instead of adding another variant to [`PageReconstructError`]. -/// The reason is that with the latter approach, any place that does -/// `?` on a `Result` will implicitly ignore the -/// need for download. We want that to be explicit, so that -/// - the code base becomes greppable for places that don't do a download -/// - future code changes will need to explicilty address for on-demand download -/// -/// Alternatives to consider in the future: -/// -/// - Inside `get_reconstruct_data`, we can std::thread::spawn a thread -/// and use it to block_on the download_remote_layer future. -/// That is obviously inefficient as it creates one thread per download. -/// - Convert everything to async. The problem here is that the sync -/// functions are used by many other sync functions. So, the scope -/// creep of such a conversion is tremendous. -/// - Compromise between the two: implement async functions for each sync -/// function. Switch over the hot code paths (GetPage()) to use the -/// async path, so that the hot path doesn't spawn threads. Other code -/// paths would remain sync initially, and get converted to async over time. -/// -pub async fn with_ondemand_download(mut f: F) -> Result -where - F: Send + FnMut() -> PageReconstructResult, - T: Send, -{ - loop { - let closure_result = f(); - match closure_result { - PageReconstructResult::NeedsDownload(weak_timeline, weak_remote_layer) => { - // if the timeline is gone, it has likely been deleted / tenant detached - let tl = weak_timeline.upgrade().context("timeline is gone")?; - // if the remote layer got removed, retry the function, it might succeed now - let remote_layer = match weak_remote_layer.upgrade() { - None => { - info!("remote layer is gone, retrying closure"); - continue; - } - Some(l) => l, - }; - // Does retries internally - tl.download_remote_layer(remote_layer).await?; - // Download successful, retry the closure - continue; - } - PageReconstructResult::Success(closure_value) => return Ok(closure_value), - PageReconstructResult::Error(e) => { - return Err(anyhow::Error::new(e).context("Failed to reconstruct the page")) - } - } - } -} - type TraversalPathItem = ( ValueReconstructResult, Lsn, - Box TraversalId>, + Box TraversalId>, ); /// Helper function for get_reconstruct_data() to add the path of layers traversed /// to an error, as anyhow context information. -fn layer_traversal_error(msg: String, path: Vec) -> PageReconstructResult<()> { +fn layer_traversal_error(msg: String, path: Vec) -> PageReconstructError { // We want the original 'msg' to be the outermost context. The outermost context // is the most high-level information, which also gets propagated to the client. let mut msg_iter = path @@ -3434,7 +3256,7 @@ fn layer_traversal_error(msg: String, path: Vec) -> PageRecon // Append all subsequent traversals, and the error message 'msg', as contexts. let msg = msg_iter.fold(err, |err, msg| err.context(msg)); - PageReconstructResult::from(msg) + PageReconstructError::from(msg) } /// Various functions to mutate the timeline. diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 1c974f7e2a..0de2e6654d 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -30,8 +30,8 @@ use bytes::{Buf, Bytes, BytesMut}; use tracing::*; use crate::pgdatadir_mapping::*; +use crate::tenant::PageReconstructError; use crate::tenant::Timeline; -use crate::tenant::{with_ondemand_download, PageReconstructError}; use crate::walrecord::*; use crate::ZERO_PAGE; use pageserver_api::reltag::{RelTag, SlruKind}; @@ -55,8 +55,7 @@ impl<'a> WalIngest<'a> { pub async fn new(timeline: &Timeline, startpoint: Lsn) -> anyhow::Result { // Fetch the latest checkpoint into memory, so that we can compare with it // quickly in `ingest_record` and update it when it changes. - let checkpoint_bytes = - with_ondemand_download(|| timeline.get_checkpoint(startpoint)).await?; + let checkpoint_bytes = timeline.get_checkpoint(startpoint).await?; let checkpoint = CheckPoint::decode(&checkpoint_bytes)?; trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value); @@ -107,7 +106,7 @@ impl<'a> WalIngest<'a> { == pg_constants::XLOG_SMGR_CREATE { let create = XlSmgrCreate::decode(&mut buf); - self.ingest_xlog_smgr_create(modification, &create)?; + self.ingest_xlog_smgr_create(modification, &create).await?; } else if decoded.xl_rmid == pg_constants::RM_SMGR_ID && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_SMGR_TRUNCATE @@ -135,7 +134,7 @@ impl<'a> WalIngest<'a> { let dropdb = XlDropDatabase::decode(&mut buf); for tablespace_id in dropdb.tablespace_ids { trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); - modification.drop_dbdir(tablespace_id, dropdb.db_id)?; + modification.drop_dbdir(tablespace_id, dropdb.db_id).await?; } } } else if self.timeline.pg_version == 15 { @@ -159,7 +158,7 @@ impl<'a> WalIngest<'a> { let dropdb = XlDropDatabase::decode(&mut buf); for tablespace_id in dropdb.tablespace_ids { trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); - modification.drop_dbdir(tablespace_id, dropdb.db_id)?; + modification.drop_dbdir(tablespace_id, dropdb.db_id).await?; } } } @@ -214,9 +213,11 @@ impl<'a> WalIngest<'a> { parsed_xact.xid, lsn, ); - modification.drop_twophase_file(parsed_xact.xid)?; + modification.drop_twophase_file(parsed_xact.xid).await?; } else if info == pg_constants::XLOG_XACT_PREPARE { - modification.put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]))?; + modification + .put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..])) + .await?; } } else if decoded.xl_rmid == pg_constants::RM_MULTIXACT_ID { let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; @@ -250,11 +251,13 @@ impl<'a> WalIngest<'a> { self.ingest_multixact_create_record(modification, &xlrec)?; } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID { let xlrec = XlMultiXactTruncate::decode(&mut buf); - self.ingest_multixact_truncate_record(modification, &xlrec)?; + self.ingest_multixact_truncate_record(modification, &xlrec) + .await?; } } else if decoded.xl_rmid == pg_constants::RM_RELMAP_ID { let xlrec = XlRelmapUpdate::decode(&mut buf); - self.ingest_relmap_page(modification, &xlrec, decoded)?; + self.ingest_relmap_page(modification, &xlrec, decoded) + .await?; } else if decoded.xl_rmid == pg_constants::RM_XLOG_ID { let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; if info == pg_constants::XLOG_NEXTOID { @@ -534,23 +537,21 @@ impl<'a> WalIngest<'a> { // get calls instead. let req_lsn = modification.tline.get_last_record_lsn(); - let rels = with_ondemand_download(|| { - modification - .tline - .list_rels(src_tablespace_id, src_db_id, req_lsn) - }) - .await?; + let rels = modification + .tline + .list_rels(src_tablespace_id, src_db_id, req_lsn) + .await?; debug!("ingest_xlog_dbase_create: {} rels", rels.len()); // Copy relfilemap - let filemap = with_ondemand_download(|| { - modification - .tline - .get_relmap_file(src_tablespace_id, src_db_id, req_lsn) - }) - .await?; - modification.put_relmap_file(tablespace_id, db_id, filemap)?; + let filemap = modification + .tline + .get_relmap_file(src_tablespace_id, src_db_id, req_lsn) + .await?; + modification + .put_relmap_file(tablespace_id, db_id, filemap) + .await?; let mut num_rels_copied = 0; let mut num_blocks_copied = 0; @@ -558,9 +559,10 @@ impl<'a> WalIngest<'a> { assert_eq!(src_rel.spcnode, src_tablespace_id); assert_eq!(src_rel.dbnode, src_db_id); - let nblocks = - with_ondemand_download(|| modification.tline.get_rel_size(src_rel, req_lsn, true)) - .await?; + let nblocks = modification + .tline + .get_rel_size(src_rel, req_lsn, true) + .await?; let dst_rel = RelTag { spcnode: tablespace_id, dbnode: db_id, @@ -568,19 +570,17 @@ impl<'a> WalIngest<'a> { forknum: src_rel.forknum, }; - modification.put_rel_creation(dst_rel, nblocks)?; + modification.put_rel_creation(dst_rel, nblocks).await?; // Copy content debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks); for blknum in 0..nblocks { debug!("copying block {} from {} to {}", blknum, src_rel, dst_rel); - let content = with_ondemand_download(|| { - modification - .tline - .get_rel_page_at_lsn(src_rel, blknum, req_lsn, true) - }) - .await?; + let content = modification + .tline + .get_rel_page_at_lsn(src_rel, blknum, req_lsn, true) + .await?; modification.put_rel_page_image(dst_rel, blknum, content)?; num_blocks_copied += 1; } @@ -595,9 +595,9 @@ impl<'a> WalIngest<'a> { Ok(()) } - fn ingest_xlog_smgr_create( + async fn ingest_xlog_smgr_create( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification<'_>, rec: &XlSmgrCreate, ) -> anyhow::Result<()> { let rel = RelTag { @@ -606,7 +606,7 @@ impl<'a> WalIngest<'a> { relnode: rec.rnode.relnode, forknum: rec.forknum, }; - self.put_rel_creation(modification, rel)?; + self.put_rel_creation(modification, rel).await?; Ok(()) } @@ -629,7 +629,8 @@ impl<'a> WalIngest<'a> { relnode, forknum: MAIN_FORKNUM, }; - self.put_rel_truncation(modification, rel, rec.blkno)?; + self.put_rel_truncation(modification, rel, rec.blkno) + .await?; } if (rec.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 { let rel = RelTag { @@ -650,7 +651,8 @@ impl<'a> WalIngest<'a> { let nblocks = self.get_relsize(rel, modification.lsn).await?; if nblocks > fsm_physical_page_no { // check if something to do: FSM is larger than truncate position - self.put_rel_truncation(modification, rel, fsm_physical_page_no)?; + self.put_rel_truncation(modification, rel, fsm_physical_page_no) + .await?; } } if (rec.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 { @@ -671,7 +673,8 @@ impl<'a> WalIngest<'a> { let nblocks = self.get_relsize(rel, modification.lsn).await?; if nblocks > vm_page_no { // check if something to do: VM is larger than truncate position - self.put_rel_truncation(modification, rel, vm_page_no)?; + self.put_rel_truncation(modification, rel, vm_page_no) + .await?; } } Ok(()) @@ -740,10 +743,12 @@ impl<'a> WalIngest<'a> { relnode: xnode.relnode, }; let last_lsn = self.timeline.get_last_record_lsn(); - if with_ondemand_download(|| modification.tline.get_rel_exists(rel, last_lsn, true)) + if modification + .tline + .get_rel_exists(rel, last_lsn, true) .await? { - self.put_rel_drop(modification, rel)?; + self.put_rel_drop(modification, rel).await?; } } } @@ -795,16 +800,16 @@ impl<'a> WalIngest<'a> { // instead. let req_lsn = modification.tline.get_last_record_lsn(); - let slru_segments = with_ondemand_download(|| { - modification - .tline - .list_slru_segments(SlruKind::Clog, req_lsn) - }) - .await?; + let slru_segments = modification + .tline + .list_slru_segments(SlruKind::Clog, req_lsn) + .await?; for segno in slru_segments { let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT; if slru_may_delete_clogsegment(segpage, xlrec.pageno) { - modification.drop_slru_segment(SlruKind::Clog, segno)?; + modification + .drop_slru_segment(SlruKind::Clog, segno) + .await?; trace!("Drop CLOG segment {:>04X}", segno); } } @@ -891,9 +896,9 @@ impl<'a> WalIngest<'a> { Ok(()) } - fn ingest_multixact_truncate_record( + async fn ingest_multixact_truncate_record( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification<'_>, xlrec: &XlMultiXactTruncate, ) -> Result<()> { self.checkpoint.oldestMulti = xlrec.end_trunc_off; @@ -909,7 +914,9 @@ impl<'a> WalIngest<'a> { // Delete all the segments except the last one. The last segment can still // contain, possibly partially, valid data. while segment != endsegment { - modification.drop_slru_segment(SlruKind::MultiXactMembers, segment as u32)?; + modification + .drop_slru_segment(SlruKind::MultiXactMembers, segment as u32) + .await?; /* move to next segment, handling wraparound correctly */ if segment == maxsegment { @@ -925,9 +932,9 @@ impl<'a> WalIngest<'a> { Ok(()) } - fn ingest_relmap_page( + async fn ingest_relmap_page( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification<'_>, xlrec: &XlRelmapUpdate, decoded: &DecodedWALRecord, ) -> Result<()> { @@ -936,17 +943,19 @@ impl<'a> WalIngest<'a> { // skip xl_relmap_update buf.advance(12); - modification.put_relmap_file(xlrec.tsid, xlrec.dbid, Bytes::copy_from_slice(&buf[..]))?; + modification + .put_relmap_file(xlrec.tsid, xlrec.dbid, Bytes::copy_from_slice(&buf[..])) + .await?; Ok(()) } - fn put_rel_creation( + async fn put_rel_creation( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification<'_>, rel: RelTag, ) -> Result<()> { - modification.put_rel_creation(rel, 0)?; + modification.put_rel_creation(rel, 0).await?; Ok(()) } @@ -974,28 +983,31 @@ impl<'a> WalIngest<'a> { Ok(()) } - fn put_rel_truncation( + async fn put_rel_truncation( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification<'_>, rel: RelTag, nblocks: BlockNumber, ) -> anyhow::Result<()> { - modification.put_rel_truncation(rel, nblocks)?; + modification.put_rel_truncation(rel, nblocks).await?; Ok(()) } - fn put_rel_drop(&mut self, modification: &mut DatadirModification, rel: RelTag) -> Result<()> { - modification.put_rel_drop(rel)?; + async fn put_rel_drop( + &mut self, + modification: &mut DatadirModification<'_>, + rel: RelTag, + ) -> Result<()> { + modification.put_rel_drop(rel).await?; Ok(()) } async fn get_relsize(&mut self, rel: RelTag, lsn: Lsn) -> anyhow::Result { - let exists = - with_ondemand_download(|| self.timeline.get_rel_exists(rel, lsn, true)).await?; + let exists = self.timeline.get_rel_exists(rel, lsn, true).await?; let nblocks = if !exists { 0 } else { - with_ondemand_download(|| self.timeline.get_rel_size(rel, lsn, true)).await? + self.timeline.get_rel_size(rel, lsn, true).await? }; Ok(nblocks) } @@ -1011,19 +1023,17 @@ impl<'a> WalIngest<'a> { // record. // TODO: would be nice if to be more explicit about it let last_lsn = modification.lsn; - let old_nblocks = - if !with_ondemand_download(|| self.timeline.get_rel_exists(rel, last_lsn, true)).await? - { - // create it with 0 size initially, the logic below will extend it - modification.put_rel_creation(rel, 0)?; - 0 - } else { - with_ondemand_download(|| self.timeline.get_rel_size(rel, last_lsn, true)).await? - }; + let old_nblocks = if !self.timeline.get_rel_exists(rel, last_lsn, true).await? { + // create it with 0 size initially, the logic below will extend it + modification.put_rel_creation(rel, 0).await?; + 0 + } else { + self.timeline.get_rel_size(rel, last_lsn, true).await? + }; if new_nblocks > old_nblocks { //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks); - modification.put_rel_extend(rel, new_nblocks)?; + modification.put_rel_extend(rel, new_nblocks).await?; // fill the gap with zeros for gap_blknum in old_nblocks..blknum { @@ -1063,16 +1073,19 @@ impl<'a> WalIngest<'a> { // record. // TODO: would be nice if to be more explicit about it let last_lsn = self.timeline.get_last_record_lsn(); - let old_nblocks = if !with_ondemand_download(|| { - self.timeline.get_slru_segment_exists(kind, segno, last_lsn) - }) - .await? + let old_nblocks = if !self + .timeline + .get_slru_segment_exists(kind, segno, last_lsn) + .await? { // create it with 0 size initially, the logic below will extend it - modification.put_slru_segment_creation(kind, segno, 0)?; + modification + .put_slru_segment_creation(kind, segno, 0) + .await?; 0 } else { - with_ondemand_download(|| self.timeline.get_slru_segment_size(kind, segno, last_lsn)) + self.timeline + .get_slru_segment_size(kind, segno, last_lsn) .await? }; @@ -1124,7 +1137,7 @@ mod tests { async fn init_walingest_test(tline: &Timeline) -> Result { let mut m = tline.begin_modification(Lsn(0x10)); m.put_checkpoint(ZERO_CHECKPOINT.clone())?; - m.put_relmap_file(0, 111, Bytes::from(""))?; // dummy relmapper file + m.put_relmap_file(0, 111, Bytes::from("")).await?; // dummy relmapper file m.commit()?; let walingest = WalIngest::new(tline, Lsn(0x10)).await?; @@ -1138,7 +1151,7 @@ mod tests { let mut walingest = init_walingest_test(&tline).await?; let mut m = tline.begin_modification(Lsn(0x20)); - walingest.put_rel_creation(&mut m, TESTREL_A)?; + walingest.put_rel_creation(&mut m, TESTREL_A).await?; walingest .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2")) .await?; @@ -1163,132 +1176,103 @@ mod tests { // The relation was created at LSN 2, not visible at LSN 1 yet. assert_eq!( - tline - .get_rel_exists(TESTREL_A, Lsn(0x10), false) - .no_ondemand_download()?, + tline.get_rel_exists(TESTREL_A, Lsn(0x10), false).await?, false ); assert!(tline .get_rel_size(TESTREL_A, Lsn(0x10), false) - .no_ondemand_download() + .await .is_err()); assert_eq!( - tline - .get_rel_exists(TESTREL_A, Lsn(0x20), false) - .no_ondemand_download()?, + tline.get_rel_exists(TESTREL_A, Lsn(0x20), false).await?, true ); - assert_eq!( - tline - .get_rel_size(TESTREL_A, Lsn(0x20), false) - .no_ondemand_download()?, - 1 - ); - assert_eq!( - tline - .get_rel_size(TESTREL_A, Lsn(0x50), false) - .no_ondemand_download()?, - 3 - ); + assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x20), false).await?, 1); + assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x50), false).await?, 3); // Check page contents at each LSN assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x20), false) - .no_ondemand_download()?, + .await?, TEST_IMG("foo blk 0 at 2") ); assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x30), false) - .no_ondemand_download()?, + .await?, TEST_IMG("foo blk 0 at 3") ); assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x40), false) - .no_ondemand_download()?, + .await?, TEST_IMG("foo blk 0 at 3") ); assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false) - .no_ondemand_download()?, + .await?, TEST_IMG("foo blk 1 at 4") ); assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x50), false) - .no_ondemand_download()?, + .await?, TEST_IMG("foo blk 0 at 3") ); assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false) - .no_ondemand_download()?, + .await?, TEST_IMG("foo blk 1 at 4") ); assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, 2, Lsn(0x50), false) - .no_ondemand_download()?, + .await?, TEST_IMG("foo blk 2 at 5") ); // Truncate last block let mut m = tline.begin_modification(Lsn(0x60)); - walingest.put_rel_truncation(&mut m, TESTREL_A, 2)?; + walingest.put_rel_truncation(&mut m, TESTREL_A, 2).await?; m.commit()?; assert_current_logical_size(&tline, Lsn(0x60)); // Check reported size and contents after truncation - assert_eq!( - tline - .get_rel_size(TESTREL_A, Lsn(0x60), false) - .no_ondemand_download()?, - 2 - ); + assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x60), false).await?, 2); assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x60), false) - .no_ondemand_download()?, + .await?, TEST_IMG("foo blk 0 at 3") ); assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false) - .no_ondemand_download()?, + .await?, TEST_IMG("foo blk 1 at 4") ); // should still see the truncated block with older LSN - assert_eq!( - tline - .get_rel_size(TESTREL_A, Lsn(0x50), false) - .no_ondemand_download()?, - 3 - ); + assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x50), false).await?, 3); assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, 2, Lsn(0x50), false) - .no_ondemand_download()?, + .await?, TEST_IMG("foo blk 2 at 5") ); // Truncate to zero length let mut m = tline.begin_modification(Lsn(0x68)); - walingest.put_rel_truncation(&mut m, TESTREL_A, 0)?; + walingest.put_rel_truncation(&mut m, TESTREL_A, 0).await?; m.commit()?; - assert_eq!( - tline - .get_rel_size(TESTREL_A, Lsn(0x68), false) - .no_ondemand_download()?, - 0 - ); + assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x68), false).await?, 0); // Extend from 0 to 2 blocks, leaving a gap let mut m = tline.begin_modification(Lsn(0x70)); @@ -1296,22 +1280,17 @@ mod tests { .put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1")) .await?; m.commit()?; - assert_eq!( - tline - .get_rel_size(TESTREL_A, Lsn(0x70), false) - .no_ondemand_download()?, - 2 - ); + assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x70), false).await?, 2); assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x70), false) - .no_ondemand_download()?, + .await?, ZERO_PAGE ); assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x70), false) - .no_ondemand_download()?, + .await?, TEST_IMG("foo blk 1") ); @@ -1321,24 +1300,19 @@ mod tests { .put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500")) .await?; m.commit()?; - assert_eq!( - tline - .get_rel_size(TESTREL_A, Lsn(0x80), false) - .no_ondemand_download()?, - 1501 - ); + assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x80), false).await?, 1501); for blk in 2..1500 { assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, blk, Lsn(0x80), false) - .no_ondemand_download()?, + .await?, ZERO_PAGE ); } assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, 1500, Lsn(0x80), false) - .no_ondemand_download()?, + .await?, TEST_IMG("foo blk 1500") ); @@ -1361,28 +1335,19 @@ mod tests { // Check that rel exists and size is correct assert_eq!( - tline - .get_rel_exists(TESTREL_A, Lsn(0x20), false) - .no_ondemand_download()?, + tline.get_rel_exists(TESTREL_A, Lsn(0x20), false).await?, true ); - assert_eq!( - tline - .get_rel_size(TESTREL_A, Lsn(0x20), false) - .no_ondemand_download()?, - 1 - ); + assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x20), false).await?, 1); // Drop rel let mut m = tline.begin_modification(Lsn(0x30)); - walingest.put_rel_drop(&mut m, TESTREL_A)?; + walingest.put_rel_drop(&mut m, TESTREL_A).await?; m.commit()?; // Check that rel is not visible anymore assert_eq!( - tline - .get_rel_exists(TESTREL_A, Lsn(0x30), false) - .no_ondemand_download()?, + tline.get_rel_exists(TESTREL_A, Lsn(0x30), false).await?, false ); @@ -1398,17 +1363,10 @@ mod tests { // Check that rel exists and size is correct assert_eq!( - tline - .get_rel_exists(TESTREL_A, Lsn(0x40), false) - .no_ondemand_download()?, + tline.get_rel_exists(TESTREL_A, Lsn(0x40), false).await?, true ); - assert_eq!( - tline - .get_rel_size(TESTREL_A, Lsn(0x40), false) - .no_ondemand_download()?, - 1 - ); + assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x40), false).await?, 1); Ok(()) } @@ -1435,26 +1393,20 @@ mod tests { // The relation was created at LSN 20, not visible at LSN 1 yet. assert_eq!( - tline - .get_rel_exists(TESTREL_A, Lsn(0x10), false) - .no_ondemand_download()?, + tline.get_rel_exists(TESTREL_A, Lsn(0x10), false).await?, false ); assert!(tline .get_rel_size(TESTREL_A, Lsn(0x10), false) - .no_ondemand_download() + .await .is_err()); assert_eq!( - tline - .get_rel_exists(TESTREL_A, Lsn(0x20), false) - .no_ondemand_download()?, + tline.get_rel_exists(TESTREL_A, Lsn(0x20), false).await?, true ); assert_eq!( - tline - .get_rel_size(TESTREL_A, Lsn(0x20), false) - .no_ondemand_download()?, + tline.get_rel_size(TESTREL_A, Lsn(0x20), false).await?, relsize ); @@ -1465,7 +1417,7 @@ mod tests { assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, blkno, lsn, false) - .no_ondemand_download()?, + .await?, TEST_IMG(&data) ); } @@ -1473,16 +1425,11 @@ mod tests { // Truncate relation so that second segment was dropped // - only leave one page let mut m = tline.begin_modification(Lsn(0x60)); - walingest.put_rel_truncation(&mut m, TESTREL_A, 1)?; + walingest.put_rel_truncation(&mut m, TESTREL_A, 1).await?; m.commit()?; // Check reported size and contents after truncation - assert_eq!( - tline - .get_rel_size(TESTREL_A, Lsn(0x60), false) - .no_ondemand_download()?, - 1 - ); + assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x60), false).await?, 1); for blkno in 0..1 { let lsn = Lsn(0x20); @@ -1490,16 +1437,14 @@ mod tests { assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x60), false) - .no_ondemand_download()?, + .await?, TEST_IMG(&data) ); } // should still see all blocks with older LSN assert_eq!( - tline - .get_rel_size(TESTREL_A, Lsn(0x50), false) - .no_ondemand_download()?, + tline.get_rel_size(TESTREL_A, Lsn(0x50), false).await?, relsize ); for blkno in 0..relsize { @@ -1508,7 +1453,7 @@ mod tests { assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x50), false) - .no_ondemand_download()?, + .await?, TEST_IMG(&data) ); } @@ -1526,15 +1471,11 @@ mod tests { m.commit()?; assert_eq!( - tline - .get_rel_exists(TESTREL_A, Lsn(0x80), false) - .no_ondemand_download()?, + tline.get_rel_exists(TESTREL_A, Lsn(0x80), false).await?, true ); assert_eq!( - tline - .get_rel_size(TESTREL_A, Lsn(0x80), false) - .no_ondemand_download()?, + tline.get_rel_size(TESTREL_A, Lsn(0x80), false).await?, relsize ); // Check relation content @@ -1544,7 +1485,7 @@ mod tests { assert_eq!( tline .get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x80), false) - .no_ondemand_download()?, + .await?, TEST_IMG(&data) ); } @@ -1574,21 +1515,19 @@ mod tests { assert_current_logical_size(&tline, Lsn(lsn)); assert_eq!( - tline - .get_rel_size(TESTREL_A, Lsn(lsn), false) - .no_ondemand_download()?, + tline.get_rel_size(TESTREL_A, Lsn(lsn), false).await?, RELSEG_SIZE + 1 ); // Truncate one block lsn += 0x10; let mut m = tline.begin_modification(Lsn(lsn)); - walingest.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE)?; + walingest + .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE) + .await?; m.commit()?; assert_eq!( - tline - .get_rel_size(TESTREL_A, Lsn(lsn), false) - .no_ondemand_download()?, + tline.get_rel_size(TESTREL_A, Lsn(lsn), false).await?, RELSEG_SIZE ); assert_current_logical_size(&tline, Lsn(lsn)); @@ -1596,12 +1535,12 @@ mod tests { // Truncate another block lsn += 0x10; let mut m = tline.begin_modification(Lsn(lsn)); - walingest.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1)?; + walingest + .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1) + .await?; m.commit()?; assert_eq!( - tline - .get_rel_size(TESTREL_A, Lsn(lsn), false) - .no_ondemand_download()?, + tline.get_rel_size(TESTREL_A, Lsn(lsn), false).await?, RELSEG_SIZE - 1 ); assert_current_logical_size(&tline, Lsn(lsn)); @@ -1612,12 +1551,12 @@ mod tests { while size >= 0 { lsn += 0x10; let mut m = tline.begin_modification(Lsn(lsn)); - walingest.put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber)?; + walingest + .put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber) + .await?; m.commit()?; assert_eq!( - tline - .get_rel_size(TESTREL_A, Lsn(lsn), false) - .no_ondemand_download()?, + tline.get_rel_size(TESTREL_A, Lsn(lsn), false).await?, size as BlockNumber ); diff --git a/test_runner/regress/test_broken_timeline.py b/test_runner/regress/test_broken_timeline.py index 05d5788028..d12a0223a1 100644 --- a/test_runner/regress/test_broken_timeline.py +++ b/test_runner/regress/test_broken_timeline.py @@ -15,7 +15,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder): env.pageserver.allowed_errors.extend( [ - ".*Failed to reconstruct the page.*", + ".*Failed to load delta layer.*", ".*could not find data for key.*", ".*is not active. Current state: Broken.*", ".*will not become active. Current state: Broken.*", @@ -99,7 +99,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder): # Third timeline will also fail during basebackup, because the layer file is corrupt. # It will fail when we try to read (and reconstruct) a page from it, ergo the error message. # (We don't check layer file contents on startup, when loading the timeline) - with pytest.raises(Exception, match="Failed to reconstruct the page") as err: + with pytest.raises(Exception, match="Failed to load delta layer") as err: pg3.start() log.info( f"As expected, compute startup failed for timeline {tenant3}/{timeline3} with corrupt layers: {err}"