From 8b692e131bdb5010a784032cc5e399f15d256bd6 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Tue, 3 Jan 2023 14:44:42 +0200 Subject: [PATCH] Enable on-demand download in WalIngest. (#3233) Makes the top-level functions in WalIngest async, and replaces no_ondemand_download calls with with_ondemand_download. This hopefully fixes the problem reported in issue #3230, although I don't have a self-contained test case for it. --- pageserver/src/basebackup.rs | 15 +- pageserver/src/import_datadir.rs | 13 +- pageserver/src/walingest.rs | 332 +++++++++--------- .../src/walreceiver/walreceiver_connection.rs | 21 +- 4 files changed, 187 insertions(+), 194 deletions(-) diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index e537048489..4052f13875 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -214,10 +214,11 @@ where let mut segment_data: Vec = vec![]; for blknum in startblk..endblk { - let img = self - .timeline - .get_rel_page_at_lsn(tag, blknum, self.lsn, false) - .no_ondemand_download()?; + let img = with_ondemand_download(|| { + self.timeline + .get_rel_page_at_lsn(tag, blknum, self.lsn, false) + }) + .await?; segment_data.extend_from_slice(&img[..]); } @@ -313,10 +314,8 @@ where // XLOG_TBLSPC_DROP records. But we probably should just // throw an error on CREATE TABLESPACE in the first place. if !has_relmap_file - && self - .timeline - .list_rels(spcnode, dbnode, self.lsn) - .no_ondemand_download()? + && with_ondemand_download(|| self.timeline.list_rels(spcnode, dbnode, self.lsn)) + .await? .is_empty() { return Ok(()); diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index bac27f69de..ca1514dd00 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -99,7 +99,8 @@ pub async fn import_timeline_from_postgres_datadir( tline, Lsn(pg_control.checkPointCopy.redo), pgdata_lsn, - )?; + ) + .await?; Ok(()) } @@ -240,7 +241,7 @@ async fn import_slru( /// Scan PostgreSQL WAL files in given directory and load all records between /// 'startpoint' and 'endpoint' into the repository. -fn import_wal( +async fn import_wal( walpath: &Path, tline: &Timeline, startpoint: Lsn, @@ -253,7 +254,7 @@ fn import_wal( let mut offset = startpoint.segment_offset(WAL_SEGMENT_SIZE); let mut last_lsn = startpoint; - let mut walingest = WalIngest::new(tline, startpoint).no_ondemand_download()?; + let mut walingest = WalIngest::new(tline, startpoint).await?; while last_lsn <= endpoint { // FIXME: assume postgresql tli 1 for now @@ -291,7 +292,7 @@ fn import_wal( if let Some((lsn, recdata)) = waldecoder.poll_decode()? { walingest .ingest_record(recdata, lsn, &mut modification, &mut decoded) - .no_ondemand_download()?; + .await?; last_lsn = lsn; nrecords += 1; @@ -375,7 +376,7 @@ pub async fn import_wal_from_tar( let mut segno = start_lsn.segment_number(WAL_SEGMENT_SIZE); let mut offset = start_lsn.segment_offset(WAL_SEGMENT_SIZE); let mut last_lsn = start_lsn; - let mut walingest = WalIngest::new(tline, start_lsn).no_ondemand_download()?; + let mut walingest = WalIngest::new(tline, start_lsn).await?; // Ingest wal until end_lsn info!("importing wal until {}", end_lsn); @@ -425,7 +426,7 @@ pub async fn import_wal_from_tar( if let Some((lsn, recdata)) = waldecoder.poll_decode()? { walingest .ingest_record(recdata, lsn, &mut modification, &mut decoded) - .no_ondemand_download()?; + .await?; last_lsn = lsn; debug!("imported record at {} (end {})", lsn, end_lsn); diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 031b80a6e0..1c974f7e2a 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -21,7 +21,6 @@ //! redo Postgres process, but some records it can handle directly with //! bespoken Rust code. -use anyhow::Context; use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes; use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment; use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn}; @@ -31,12 +30,10 @@ use bytes::{Buf, Bytes, BytesMut}; use tracing::*; use crate::pgdatadir_mapping::*; -use crate::tenant::PageReconstructResult; use crate::tenant::Timeline; -use crate::try_page_reconstruct_result as try_prr; +use crate::tenant::{with_ondemand_download, PageReconstructError}; use crate::walrecord::*; use crate::ZERO_PAGE; -use crate::{try_no_ondemand_download, try_page_reconstruct_result}; use pageserver_api::reltag::{RelTag, SlruKind}; use postgres_ffi::pg_constants; use postgres_ffi::relfile_utils::{FSM_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM}; @@ -55,16 +52,15 @@ pub struct WalIngest<'a> { } impl<'a> WalIngest<'a> { - pub fn new(timeline: &Timeline, startpoint: Lsn) -> PageReconstructResult { + pub async fn new(timeline: &Timeline, startpoint: Lsn) -> anyhow::Result { // Fetch the latest checkpoint into memory, so that we can compare with it // quickly in `ingest_record` and update it when it changes. - let checkpoint_bytes = try_no_ondemand_download!(timeline.get_checkpoint(startpoint)); - let checkpoint = try_page_reconstruct_result!( - CheckPoint::decode(&checkpoint_bytes).context("Failed to decode checkpoint bytes") - ); + let checkpoint_bytes = + with_ondemand_download(|| timeline.get_checkpoint(startpoint)).await?; + let checkpoint = CheckPoint::decode(&checkpoint_bytes)?; trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value); - PageReconstructResult::Success(WalIngest { + Ok(WalIngest { timeline, checkpoint, checkpoint_modified: false, @@ -79,18 +75,15 @@ impl<'a> WalIngest<'a> { /// Helper function to parse a WAL record and call the Timeline's PUT functions for all the /// relations/pages that the record affects. /// - pub fn ingest_record( + pub async fn ingest_record( &mut self, recdata: Bytes, lsn: Lsn, - modification: &mut DatadirModification, + modification: &mut DatadirModification<'_>, decoded: &mut DecodedWALRecord, - ) -> PageReconstructResult<()> { + ) -> anyhow::Result<()> { modification.lsn = lsn; - try_prr!( - decode_wal_record(recdata, decoded, self.timeline.pg_version) - .context("failed decoding wal record") - ); + decode_wal_record(recdata, decoded, self.timeline.pg_version)?; let mut buf = decoded.record.clone(); buf.advance(decoded.main_data_offset); @@ -105,7 +98,8 @@ impl<'a> WalIngest<'a> { if decoded.xl_rmid == pg_constants::RM_HEAP_ID || decoded.xl_rmid == pg_constants::RM_HEAP2_ID { - try_prr!(self.ingest_heapam_record(&mut buf, modification, decoded)); + self.ingest_heapam_record(&mut buf, modification, decoded) + .await?; } // Handle other special record types if decoded.xl_rmid == pg_constants::RM_SMGR_ID @@ -113,13 +107,14 @@ impl<'a> WalIngest<'a> { == pg_constants::XLOG_SMGR_CREATE { let create = XlSmgrCreate::decode(&mut buf); - try_prr!(self.ingest_xlog_smgr_create(modification, &create)); + self.ingest_xlog_smgr_create(modification, &create)?; } else if decoded.xl_rmid == pg_constants::RM_SMGR_ID && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_SMGR_TRUNCATE { let truncate = XlSmgrTruncate::decode(&mut buf); - try_prr!(self.ingest_xlog_smgr_truncate(modification, &truncate)); + self.ingest_xlog_smgr_truncate(modification, &truncate) + .await?; } else if decoded.xl_rmid == pg_constants::RM_DBASE_ID { debug!( "handle RM_DBASE_ID for Postgres version {:?}", @@ -132,14 +127,15 @@ impl<'a> WalIngest<'a> { let createdb = XlCreateDatabase::decode(&mut buf); debug!("XLOG_DBASE_CREATE v14"); - try_prr!(self.ingest_xlog_dbase_create(modification, &createdb)); + self.ingest_xlog_dbase_create(modification, &createdb) + .await?; } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == postgres_ffi::v14::bindings::XLOG_DBASE_DROP { let dropdb = XlDropDatabase::decode(&mut buf); for tablespace_id in dropdb.tablespace_ids { trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); - try_prr!(modification.drop_dbdir(tablespace_id, dropdb.db_id)); + modification.drop_dbdir(tablespace_id, dropdb.db_id)?; } } } else if self.timeline.pg_version == 15 { @@ -155,14 +151,15 @@ impl<'a> WalIngest<'a> { // So we can reuse XlCreateDatabase here. debug!("XLOG_DBASE_CREATE_FILE_COPY"); let createdb = XlCreateDatabase::decode(&mut buf); - try_prr!(self.ingest_xlog_dbase_create(modification, &createdb)); + self.ingest_xlog_dbase_create(modification, &createdb) + .await?; } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == postgres_ffi::v15::bindings::XLOG_DBASE_DROP { let dropdb = XlDropDatabase::decode(&mut buf); for tablespace_id in dropdb.tablespace_ids { trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); - try_prr!(modification.drop_dbdir(tablespace_id, dropdb.db_id)); + modification.drop_dbdir(tablespace_id, dropdb.db_id)?; } } } @@ -174,38 +171,42 @@ impl<'a> WalIngest<'a> { let pageno = buf.get_u32_le(); let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - try_prr!(self.put_slru_page_image( + self.put_slru_page_image( modification, SlruKind::Clog, segno, rpageno, ZERO_PAGE.clone(), - )); + ) + .await?; } else { assert!(info == pg_constants::CLOG_TRUNCATE); let xlrec = XlClogTruncate::decode(&mut buf); - try_prr!(self.ingest_clog_truncate_record(modification, &xlrec)); + self.ingest_clog_truncate_record(modification, &xlrec) + .await?; } } else if decoded.xl_rmid == pg_constants::RM_XACT_ID { let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK; if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT { let parsed_xact = XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info); - try_prr!(self.ingest_xact_record( + self.ingest_xact_record( modification, &parsed_xact, info == pg_constants::XLOG_XACT_COMMIT, - )); + ) + .await?; } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED || info == pg_constants::XLOG_XACT_ABORT_PREPARED { let parsed_xact = XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info); - try_prr!(self.ingest_xact_record( + self.ingest_xact_record( modification, &parsed_xact, info == pg_constants::XLOG_XACT_COMMIT_PREPARED, - )); + ) + .await?; // Remove twophase file. see RemoveTwoPhaseFile() in postgres code trace!( "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}", @@ -213,10 +214,9 @@ impl<'a> WalIngest<'a> { parsed_xact.xid, lsn, ); - try_prr!(modification.drop_twophase_file(parsed_xact.xid)); + modification.drop_twophase_file(parsed_xact.xid)?; } else if info == pg_constants::XLOG_XACT_PREPARE { - try_prr!(modification - .put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]))); + modification.put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]))?; } } else if decoded.xl_rmid == pg_constants::RM_MULTIXACT_ID { let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; @@ -225,34 +225,36 @@ impl<'a> WalIngest<'a> { let pageno = buf.get_u32_le(); let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - try_prr!(self.put_slru_page_image( + self.put_slru_page_image( modification, SlruKind::MultiXactOffsets, segno, rpageno, ZERO_PAGE.clone(), - )); + ) + .await?; } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE { let pageno = buf.get_u32_le(); let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - try_prr!(self.put_slru_page_image( + self.put_slru_page_image( modification, SlruKind::MultiXactMembers, segno, rpageno, ZERO_PAGE.clone(), - )); + ) + .await?; } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { let xlrec = XlMultiXactCreate::decode(&mut buf); - try_prr!(self.ingest_multixact_create_record(modification, &xlrec)); + self.ingest_multixact_create_record(modification, &xlrec)?; } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID { let xlrec = XlMultiXactTruncate::decode(&mut buf); - try_prr!(self.ingest_multixact_truncate_record(modification, &xlrec)); + self.ingest_multixact_truncate_record(modification, &xlrec)?; } } else if decoded.xl_rmid == pg_constants::RM_RELMAP_ID { let xlrec = XlRelmapUpdate::decode(&mut buf); - try_prr!(self.ingest_relmap_page(modification, &xlrec, decoded)); + self.ingest_relmap_page(modification, &xlrec, decoded)?; } else if decoded.xl_rmid == pg_constants::RM_XLOG_ID { let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; if info == pg_constants::XLOG_NEXTOID { @@ -266,9 +268,7 @@ impl<'a> WalIngest<'a> { { let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT]; buf.copy_to_slice(&mut checkpoint_bytes); - let xlog_checkpoint = try_prr!( - CheckPoint::decode(&checkpoint_bytes).context("deserialize CheckPoint") - ); + let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes)?; trace!( "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}", xlog_checkpoint.oldestXid, @@ -289,32 +289,32 @@ impl<'a> WalIngest<'a> { // Iterate through all the blocks that the record modifies, and // "put" a separate copy of the record for each block. for blk in decoded.blocks.iter() { - try_no_ondemand_download!(self.ingest_decoded_block(modification, lsn, decoded, blk)); + self.ingest_decoded_block(modification, lsn, decoded, blk) + .await?; } // If checkpoint data was updated, store the new version in the repository if self.checkpoint_modified { - let new_checkpoint_bytes = - try_prr!(self.checkpoint.encode().context("encode checkpoint")); + let new_checkpoint_bytes = self.checkpoint.encode()?; - try_prr!(modification.put_checkpoint(new_checkpoint_bytes)); + modification.put_checkpoint(new_checkpoint_bytes)?; self.checkpoint_modified = false; } // Now that this record has been fully handled, including updating the // checkpoint data, let the repository know that it is up-to-date to this LSN - try_prr!(modification.commit()); + modification.commit()?; - PageReconstructResult::Success(()) + Ok(()) } - fn ingest_decoded_block( + async fn ingest_decoded_block( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification<'_>, lsn: Lsn, decoded: &DecodedWALRecord, blk: &DecodedBkpBlock, - ) -> PageReconstructResult<()> { + ) -> Result<(), PageReconstructError> { let rel = RelTag { spcnode: blk.rnode_spcnode, dbnode: blk.rnode_dbnode, @@ -334,7 +334,7 @@ impl<'a> WalIngest<'a> { && (decoded.xl_info == pg_constants::XLOG_FPI || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT) // compression of WAL is not yet supported: fall back to storing the original WAL record - && !try_prr!(postgres_ffi::bkpimage_is_compressed(blk.bimg_info, self.timeline.pg_version)) + && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, self.timeline.pg_version)? { // Extract page image from FPI record let img_len = blk.bimg_len as usize; @@ -356,28 +356,25 @@ impl<'a> WalIngest<'a> { page_set_lsn(&mut image, lsn) } assert_eq!(image.len(), BLCKSZ as usize); - try_no_ondemand_download!(self.put_rel_page_image( - modification, - rel, - blk.blkno, - image.freeze() - )); + self.put_rel_page_image(modification, rel, blk.blkno, image.freeze()) + .await?; } else { let rec = NeonWalRecord::Postgres { will_init: blk.will_init || blk.apply_image, rec: decoded.record.clone(), }; - try_prr!(self.put_rel_wal_record(modification, rel, blk.blkno, rec)); + self.put_rel_wal_record(modification, rel, blk.blkno, rec) + .await?; } - PageReconstructResult::Success(()) + Ok(()) } - fn ingest_heapam_record( + async fn ingest_heapam_record( &mut self, buf: &mut Bytes, - modification: &mut DatadirModification, + modification: &mut DatadirModification<'_>, decoded: &mut DecodedWALRecord, - ) -> Result<()> { + ) -> anyhow::Result<()> { // Handle VM bit updates that are implicitly part of heap records. // First, look at the record to determine which VM bits need @@ -456,7 +453,7 @@ impl<'a> WalIngest<'a> { // replaying it would fail to find the previous image of the page, because // it doesn't exist. So check if the VM page(s) exist, and skip the WAL // record if it doesn't. - let vm_size = self.get_relsize(vm_rel, modification.lsn)?; + let vm_size = self.get_relsize(vm_rel, modification.lsn).await?; if let Some(blknum) = new_vm_blk { if blknum >= vm_size { new_vm_blk = None; @@ -481,7 +478,8 @@ impl<'a> WalIngest<'a> { old_heap_blkno, flags: pg_constants::VISIBILITYMAP_VALID_BITS, }, - )?; + ) + .await?; } else { // Clear VM bits for one heap page, or for two pages that reside on // different VM pages. @@ -495,7 +493,8 @@ impl<'a> WalIngest<'a> { old_heap_blkno: None, flags: pg_constants::VISIBILITYMAP_VALID_BITS, }, - )?; + ) + .await?; } if let Some(old_vm_blk) = old_vm_blk { self.put_rel_wal_record( @@ -507,7 +506,8 @@ impl<'a> WalIngest<'a> { old_heap_blkno, flags: pg_constants::VISIBILITYMAP_VALID_BITS, }, - )?; + ) + .await?; } } } @@ -517,9 +517,9 @@ impl<'a> WalIngest<'a> { } /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record. - fn ingest_xlog_dbase_create( + async fn ingest_xlog_dbase_create( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification<'_>, rec: &XlCreateDatabase, ) -> anyhow::Result<()> { let db_id = rec.db_id; @@ -534,18 +534,22 @@ impl<'a> WalIngest<'a> { // get calls instead. let req_lsn = modification.tline.get_last_record_lsn(); - let rels = modification - .tline - .list_rels(src_tablespace_id, src_db_id, req_lsn) - .no_ondemand_download()?; + let rels = with_ondemand_download(|| { + modification + .tline + .list_rels(src_tablespace_id, src_db_id, req_lsn) + }) + .await?; debug!("ingest_xlog_dbase_create: {} rels", rels.len()); // Copy relfilemap - let filemap = modification - .tline - .get_relmap_file(src_tablespace_id, src_db_id, req_lsn) - .no_ondemand_download()?; + let filemap = with_ondemand_download(|| { + modification + .tline + .get_relmap_file(src_tablespace_id, src_db_id, req_lsn) + }) + .await?; modification.put_relmap_file(tablespace_id, db_id, filemap)?; let mut num_rels_copied = 0; @@ -554,10 +558,9 @@ impl<'a> WalIngest<'a> { assert_eq!(src_rel.spcnode, src_tablespace_id); assert_eq!(src_rel.dbnode, src_db_id); - let nblocks = modification - .tline - .get_rel_size(src_rel, req_lsn, true) - .no_ondemand_download()?; + let nblocks = + with_ondemand_download(|| modification.tline.get_rel_size(src_rel, req_lsn, true)) + .await?; let dst_rel = RelTag { spcnode: tablespace_id, dbnode: db_id, @@ -572,10 +575,12 @@ impl<'a> WalIngest<'a> { for blknum in 0..nblocks { debug!("copying block {} from {} to {}", blknum, src_rel, dst_rel); - let content = modification - .tline - .get_rel_page_at_lsn(src_rel, blknum, req_lsn, true) - .no_ondemand_download()?; + let content = with_ondemand_download(|| { + modification + .tline + .get_rel_page_at_lsn(src_rel, blknum, req_lsn, true) + }) + .await?; modification.put_rel_page_image(dst_rel, blknum, content)?; num_blocks_copied += 1; } @@ -594,7 +599,7 @@ impl<'a> WalIngest<'a> { &mut self, modification: &mut DatadirModification, rec: &XlSmgrCreate, - ) -> Result<()> { + ) -> anyhow::Result<()> { let rel = RelTag { spcnode: rec.rnode.spcnode, dbnode: rec.rnode.dbnode, @@ -608,11 +613,11 @@ impl<'a> WalIngest<'a> { /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record. /// /// This is the same logic as in PostgreSQL's smgr_redo() function. - fn ingest_xlog_smgr_truncate( + async fn ingest_xlog_smgr_truncate( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification<'_>, rec: &XlSmgrTruncate, - ) -> Result<()> { + ) -> anyhow::Result<()> { let spcnode = rec.rnode.spcnode; let dbnode = rec.rnode.dbnode; let relnode = rec.rnode.relnode; @@ -642,7 +647,7 @@ impl<'a> WalIngest<'a> { modification.put_rel_page_image(rel, fsm_physical_page_no, ZERO_PAGE.clone())?; fsm_physical_page_no += 1; } - let nblocks = self.get_relsize(rel, modification.lsn)?; + let nblocks = self.get_relsize(rel, modification.lsn).await?; if nblocks > fsm_physical_page_no { // check if something to do: FSM is larger than truncate position self.put_rel_truncation(modification, rel, fsm_physical_page_no)?; @@ -663,7 +668,7 @@ impl<'a> WalIngest<'a> { modification.put_rel_page_image(rel, vm_page_no, ZERO_PAGE.clone())?; vm_page_no += 1; } - let nblocks = self.get_relsize(rel, modification.lsn)?; + let nblocks = self.get_relsize(rel, modification.lsn).await?; if nblocks > vm_page_no { // check if something to do: VM is larger than truncate position self.put_rel_truncation(modification, rel, vm_page_no)?; @@ -674,9 +679,9 @@ impl<'a> WalIngest<'a> { /// Subroutine of ingest_record(), to handle an XLOG_XACT_* records. /// - fn ingest_xact_record( + async fn ingest_xact_record( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification<'_>, parsed: &XlXactParsedRecord, is_commit: bool, ) -> anyhow::Result<()> { @@ -735,10 +740,8 @@ impl<'a> WalIngest<'a> { relnode: xnode.relnode, }; let last_lsn = self.timeline.get_last_record_lsn(); - if modification - .tline - .get_rel_exists(rel, last_lsn, true) - .no_ondemand_download()? + if with_ondemand_download(|| modification.tline.get_rel_exists(rel, last_lsn, true)) + .await? { self.put_rel_drop(modification, rel)?; } @@ -747,9 +750,9 @@ impl<'a> WalIngest<'a> { Ok(()) } - fn ingest_clog_truncate_record( + async fn ingest_clog_truncate_record( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification<'_>, xlrec: &XlClogTruncate, ) -> anyhow::Result<()> { info!( @@ -791,11 +794,14 @@ impl<'a> WalIngest<'a> { // it. So we use the previous record's LSN in the get calls // instead. let req_lsn = modification.tline.get_last_record_lsn(); - for segno in modification - .tline - .list_slru_segments(SlruKind::Clog, req_lsn) - .no_ondemand_download()? - { + + let slru_segments = with_ondemand_download(|| { + modification + .tline + .list_slru_segments(SlruKind::Clog, req_lsn) + }) + .await?; + for segno in slru_segments { let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT; if slru_may_delete_clogsegment(segpage, xlrec.pageno) { modification.drop_slru_segment(SlruKind::Clog, segno)?; @@ -944,27 +950,26 @@ impl<'a> WalIngest<'a> { Ok(()) } - fn put_rel_page_image( + async fn put_rel_page_image( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification<'_>, rel: RelTag, blknum: BlockNumber, img: Bytes, - ) -> PageReconstructResult<()> { - try_no_ondemand_download!(self.handle_rel_extend(modification, rel, blknum)); - try_prr!(modification.put_rel_page_image(rel, blknum, img)); - PageReconstructResult::Success(()) + ) -> anyhow::Result<()> { + self.handle_rel_extend(modification, rel, blknum).await?; + modification.put_rel_page_image(rel, blknum, img)?; + Ok(()) } - fn put_rel_wal_record( + async fn put_rel_wal_record( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification<'_>, rel: RelTag, blknum: BlockNumber, rec: NeonWalRecord, - ) -> Result<()> { - self.handle_rel_extend(modification, rel, blknum) - .no_ondemand_download()?; + ) -> anyhow::Result<()> { + self.handle_rel_extend(modification, rel, blknum).await?; modification.put_rel_wal_record(rel, blknum, rec)?; Ok(()) } @@ -984,69 +989,67 @@ impl<'a> WalIngest<'a> { Ok(()) } - fn get_relsize(&mut self, rel: RelTag, lsn: Lsn) -> anyhow::Result { - let nblocks = if !self - .timeline - .get_rel_exists(rel, lsn, true) - .no_ondemand_download()? - { + async fn get_relsize(&mut self, rel: RelTag, lsn: Lsn) -> anyhow::Result { + let exists = + with_ondemand_download(|| self.timeline.get_rel_exists(rel, lsn, true)).await?; + let nblocks = if !exists { 0 } else { - self.timeline - .get_rel_size(rel, lsn, true) - .no_ondemand_download()? + with_ondemand_download(|| self.timeline.get_rel_size(rel, lsn, true)).await? }; Ok(nblocks) } - fn handle_rel_extend( + async fn handle_rel_extend( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification<'_>, rel: RelTag, blknum: BlockNumber, - ) -> PageReconstructResult<()> { + ) -> anyhow::Result<()> { let new_nblocks = blknum + 1; // Check if the relation exists. We implicitly create relations on first // record. // TODO: would be nice if to be more explicit about it let last_lsn = modification.lsn; let old_nblocks = - if !try_no_ondemand_download!(self.timeline.get_rel_exists(rel, last_lsn, true)) { + if !with_ondemand_download(|| self.timeline.get_rel_exists(rel, last_lsn, true)).await? + { // create it with 0 size initially, the logic below will extend it - try_prr!(modification.put_rel_creation(rel, 0)); + modification.put_rel_creation(rel, 0)?; 0 } else { - try_no_ondemand_download!(self.timeline.get_rel_size(rel, last_lsn, true)) + with_ondemand_download(|| self.timeline.get_rel_size(rel, last_lsn, true)).await? }; if new_nblocks > old_nblocks { //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks); - try_prr!(modification.put_rel_extend(rel, new_nblocks)); + modification.put_rel_extend(rel, new_nblocks)?; // fill the gap with zeros for gap_blknum in old_nblocks..blknum { - try_prr!(modification.put_rel_page_image(rel, gap_blknum, ZERO_PAGE.clone())); + modification.put_rel_page_image(rel, gap_blknum, ZERO_PAGE.clone())?; } } - PageReconstructResult::Success(()) + Ok(()) } - fn put_slru_page_image( + async fn put_slru_page_image( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification<'_>, kind: SlruKind, segno: u32, blknum: BlockNumber, img: Bytes, - ) -> Result<()> { - self.handle_slru_extend(modification, kind, segno, blknum)?; + ) -> anyhow::Result<()> { + self.handle_slru_extend(modification, kind, segno, blknum) + .await?; modification.put_slru_page_image(kind, segno, blknum, img)?; Ok(()) } - fn handle_slru_extend( + async fn handle_slru_extend( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification<'_>, kind: SlruKind, segno: u32, blknum: BlockNumber, @@ -1060,18 +1063,17 @@ impl<'a> WalIngest<'a> { // record. // TODO: would be nice if to be more explicit about it let last_lsn = self.timeline.get_last_record_lsn(); - let old_nblocks = if !self - .timeline - .get_slru_segment_exists(kind, segno, last_lsn) - .no_ondemand_download()? + let old_nblocks = if !with_ondemand_download(|| { + self.timeline.get_slru_segment_exists(kind, segno, last_lsn) + }) + .await? { // create it with 0 size initially, the logic below will extend it modification.put_slru_segment_creation(kind, segno, 0)?; 0 } else { - self.timeline - .get_slru_segment_size(kind, segno, last_lsn) - .no_ondemand_download()? + with_ondemand_download(|| self.timeline.get_slru_segment_size(kind, segno, last_lsn)) + .await? }; if new_nblocks > old_nblocks { @@ -1119,12 +1121,12 @@ mod tests { static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]); - fn init_walingest_test(tline: &Timeline) -> Result { + async fn init_walingest_test(tline: &Timeline) -> Result { let mut m = tline.begin_modification(Lsn(0x10)); m.put_checkpoint(ZERO_CHECKPOINT.clone())?; m.put_relmap_file(0, 111, Bytes::from(""))?; // dummy relmapper file m.commit()?; - let walingest = WalIngest::new(tline, Lsn(0x10)).no_ondemand_download()?; + let walingest = WalIngest::new(tline, Lsn(0x10)).await?; Ok(walingest) } @@ -1133,28 +1135,28 @@ mod tests { async fn test_relsize() -> Result<()> { let tenant = TenantHarness::create("test_relsize")?.load().await; let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?; - let mut walingest = init_walingest_test(&tline)?; + let mut walingest = init_walingest_test(&tline).await?; let mut m = tline.begin_modification(Lsn(0x20)); walingest.put_rel_creation(&mut m, TESTREL_A)?; walingest .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2")) - .no_ondemand_download()?; + .await?; m.commit()?; let mut m = tline.begin_modification(Lsn(0x30)); walingest .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3")) - .no_ondemand_download()?; + .await?; m.commit()?; let mut m = tline.begin_modification(Lsn(0x40)); walingest .put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4")) - .no_ondemand_download()?; + .await?; m.commit()?; let mut m = tline.begin_modification(Lsn(0x50)); walingest .put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5")) - .no_ondemand_download()?; + .await?; m.commit()?; assert_current_logical_size(&tline, Lsn(0x50)); @@ -1292,7 +1294,7 @@ mod tests { let mut m = tline.begin_modification(Lsn(0x70)); walingest .put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1")) - .no_ondemand_download()?; + .await?; m.commit()?; assert_eq!( tline @@ -1317,7 +1319,7 @@ mod tests { let mut m = tline.begin_modification(Lsn(0x80)); walingest .put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500")) - .no_ondemand_download()?; + .await?; m.commit()?; assert_eq!( tline @@ -1349,12 +1351,12 @@ mod tests { async fn test_drop_extend() -> Result<()> { let tenant = TenantHarness::create("test_drop_extend")?.load().await; let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?; - let mut walingest = init_walingest_test(&tline)?; + let mut walingest = init_walingest_test(&tline).await?; let mut m = tline.begin_modification(Lsn(0x20)); walingest .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2")) - .no_ondemand_download()?; + .await?; m.commit()?; // Check that rel exists and size is correct @@ -1391,7 +1393,7 @@ mod tests { let mut m = tline.begin_modification(Lsn(0x40)); walingest .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4")) - .no_ondemand_download()?; + .await?; m.commit()?; // Check that rel exists and size is correct @@ -1418,7 +1420,7 @@ mod tests { async fn test_truncate_extend() -> Result<()> { let tenant = TenantHarness::create("test_truncate_extend")?.load().await; let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?; - let mut walingest = init_walingest_test(&tline)?; + let mut walingest = init_walingest_test(&tline).await?; // Create a 20 MB relation (the size is arbitrary) let relsize = 20 * 1024 * 1024 / 8192; @@ -1427,7 +1429,7 @@ mod tests { let data = format!("foo blk {} at {}", blkno, Lsn(0x20)); walingest .put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data)) - .no_ondemand_download()?; + .await?; } m.commit()?; @@ -1519,7 +1521,7 @@ mod tests { let data = format!("foo blk {} at {}", blkno, lsn); walingest .put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data)) - .no_ondemand_download()?; + .await?; } m.commit()?; @@ -1556,7 +1558,7 @@ mod tests { async fn test_large_rel() -> Result<()> { let tenant = TenantHarness::create("test_large_rel")?.load().await; let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?; - let mut walingest = init_walingest_test(&tline)?; + let mut walingest = init_walingest_test(&tline).await?; let mut lsn = 0x10; for blknum in 0..RELSEG_SIZE + 1 { @@ -1565,7 +1567,7 @@ mod tests { let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn))); walingest .put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img) - .no_ondemand_download()?; + .await?; m.commit()?; } diff --git a/pageserver/src/walreceiver/walreceiver_connection.rs b/pageserver/src/walreceiver/walreceiver_connection.rs index 3753807327..06aa132365 100644 --- a/pageserver/src/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/walreceiver/walreceiver_connection.rs @@ -20,9 +20,7 @@ use tokio::{pin, select, sync::watch, time}; use tokio_postgres::{replication::ReplicationStream, Client}; use tracing::{debug, error, info, trace, warn}; -use crate::{ - metrics::LIVE_CONNECTIONS_COUNT, tenant::with_ondemand_download, walreceiver::TaskStateUpdate, -}; +use crate::{metrics::LIVE_CONNECTIONS_COUNT, walreceiver::TaskStateUpdate}; use crate::{ task_mgr, task_mgr::TaskKind, @@ -175,8 +173,7 @@ pub async fn handle_walreceiver_connection( let mut waldecoder = WalStreamDecoder::new(startpoint, timeline.pg_version); - let mut walingest = - with_ondemand_download(|| WalIngest::new(timeline.as_ref(), startpoint)).await?; + let mut walingest = WalIngest::new(timeline.as_ref(), startpoint).await?; while let Some(replication_message) = { select! { @@ -251,16 +248,10 @@ pub async fn handle_walreceiver_connection( // at risk of hitting a deadlock. ensure!(lsn.is_aligned()); - with_ondemand_download(|| { - walingest.ingest_record( - recdata.clone(), - lsn, - &mut modification, - &mut decoded, - ) - }) - .await - .with_context(|| format!("could not ingest record at {lsn}"))?; + walingest + .ingest_record(recdata.clone(), lsn, &mut modification, &mut decoded) + .await + .with_context(|| format!("could not ingest record at {lsn}"))?; fail_point!("walreceiver-after-ingest");