Enable on-demand download in WalIngest. (#3233)

Makes the top-level functions in WalIngest async, and replaces
no_ondemand_download calls with with_ondemand_download.

This hopefully fixes the problem reported in issue #3230, although I
don't have a self-contained test case for it.
This commit is contained in:
Heikki Linnakangas
2023-01-03 14:44:42 +02:00
committed by GitHub
parent 0a0e55c3d0
commit 8b692e131b
4 changed files with 187 additions and 194 deletions

View File

@@ -214,10 +214,11 @@ where
let mut segment_data: Vec<u8> = vec![];
for blknum in startblk..endblk {
let img = self
.timeline
.get_rel_page_at_lsn(tag, blknum, self.lsn, false)
.no_ondemand_download()?;
let img = with_ondemand_download(|| {
self.timeline
.get_rel_page_at_lsn(tag, blknum, self.lsn, false)
})
.await?;
segment_data.extend_from_slice(&img[..]);
}
@@ -313,10 +314,8 @@ where
// XLOG_TBLSPC_DROP records. But we probably should just
// throw an error on CREATE TABLESPACE in the first place.
if !has_relmap_file
&& self
.timeline
.list_rels(spcnode, dbnode, self.lsn)
.no_ondemand_download()?
&& with_ondemand_download(|| self.timeline.list_rels(spcnode, dbnode, self.lsn))
.await?
.is_empty()
{
return Ok(());

View File

@@ -99,7 +99,8 @@ pub async fn import_timeline_from_postgres_datadir(
tline,
Lsn(pg_control.checkPointCopy.redo),
pgdata_lsn,
)?;
)
.await?;
Ok(())
}
@@ -240,7 +241,7 @@ async fn import_slru(
/// Scan PostgreSQL WAL files in given directory and load all records between
/// 'startpoint' and 'endpoint' into the repository.
fn import_wal(
async fn import_wal(
walpath: &Path,
tline: &Timeline,
startpoint: Lsn,
@@ -253,7 +254,7 @@ fn import_wal(
let mut offset = startpoint.segment_offset(WAL_SEGMENT_SIZE);
let mut last_lsn = startpoint;
let mut walingest = WalIngest::new(tline, startpoint).no_ondemand_download()?;
let mut walingest = WalIngest::new(tline, startpoint).await?;
while last_lsn <= endpoint {
// FIXME: assume postgresql tli 1 for now
@@ -291,7 +292,7 @@ fn import_wal(
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded)
.no_ondemand_download()?;
.await?;
last_lsn = lsn;
nrecords += 1;
@@ -375,7 +376,7 @@ pub async fn import_wal_from_tar(
let mut segno = start_lsn.segment_number(WAL_SEGMENT_SIZE);
let mut offset = start_lsn.segment_offset(WAL_SEGMENT_SIZE);
let mut last_lsn = start_lsn;
let mut walingest = WalIngest::new(tline, start_lsn).no_ondemand_download()?;
let mut walingest = WalIngest::new(tline, start_lsn).await?;
// Ingest wal until end_lsn
info!("importing wal until {}", end_lsn);
@@ -425,7 +426,7 @@ pub async fn import_wal_from_tar(
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded)
.no_ondemand_download()?;
.await?;
last_lsn = lsn;
debug!("imported record at {} (end {})", lsn, end_lsn);

View File

@@ -21,7 +21,6 @@
//! redo Postgres process, but some records it can handle directly with
//! bespoken Rust code.
use anyhow::Context;
use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
@@ -31,12 +30,10 @@ use bytes::{Buf, Bytes, BytesMut};
use tracing::*;
use crate::pgdatadir_mapping::*;
use crate::tenant::PageReconstructResult;
use crate::tenant::Timeline;
use crate::try_page_reconstruct_result as try_prr;
use crate::tenant::{with_ondemand_download, PageReconstructError};
use crate::walrecord::*;
use crate::ZERO_PAGE;
use crate::{try_no_ondemand_download, try_page_reconstruct_result};
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::{FSM_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
@@ -55,16 +52,15 @@ pub struct WalIngest<'a> {
}
impl<'a> WalIngest<'a> {
pub fn new(timeline: &Timeline, startpoint: Lsn) -> PageReconstructResult<WalIngest> {
pub async fn new(timeline: &Timeline, startpoint: Lsn) -> anyhow::Result<WalIngest> {
// Fetch the latest checkpoint into memory, so that we can compare with it
// quickly in `ingest_record` and update it when it changes.
let checkpoint_bytes = try_no_ondemand_download!(timeline.get_checkpoint(startpoint));
let checkpoint = try_page_reconstruct_result!(
CheckPoint::decode(&checkpoint_bytes).context("Failed to decode checkpoint bytes")
);
let checkpoint_bytes =
with_ondemand_download(|| timeline.get_checkpoint(startpoint)).await?;
let checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
PageReconstructResult::Success(WalIngest {
Ok(WalIngest {
timeline,
checkpoint,
checkpoint_modified: false,
@@ -79,18 +75,15 @@ impl<'a> WalIngest<'a> {
/// Helper function to parse a WAL record and call the Timeline's PUT functions for all the
/// relations/pages that the record affects.
///
pub fn ingest_record(
pub async fn ingest_record(
&mut self,
recdata: Bytes,
lsn: Lsn,
modification: &mut DatadirModification,
modification: &mut DatadirModification<'_>,
decoded: &mut DecodedWALRecord,
) -> PageReconstructResult<()> {
) -> anyhow::Result<()> {
modification.lsn = lsn;
try_prr!(
decode_wal_record(recdata, decoded, self.timeline.pg_version)
.context("failed decoding wal record")
);
decode_wal_record(recdata, decoded, self.timeline.pg_version)?;
let mut buf = decoded.record.clone();
buf.advance(decoded.main_data_offset);
@@ -105,7 +98,8 @@ impl<'a> WalIngest<'a> {
if decoded.xl_rmid == pg_constants::RM_HEAP_ID
|| decoded.xl_rmid == pg_constants::RM_HEAP2_ID
{
try_prr!(self.ingest_heapam_record(&mut buf, modification, decoded));
self.ingest_heapam_record(&mut buf, modification, decoded)
.await?;
}
// Handle other special record types
if decoded.xl_rmid == pg_constants::RM_SMGR_ID
@@ -113,13 +107,14 @@ impl<'a> WalIngest<'a> {
== pg_constants::XLOG_SMGR_CREATE
{
let create = XlSmgrCreate::decode(&mut buf);
try_prr!(self.ingest_xlog_smgr_create(modification, &create));
self.ingest_xlog_smgr_create(modification, &create)?;
} else if decoded.xl_rmid == pg_constants::RM_SMGR_ID
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_SMGR_TRUNCATE
{
let truncate = XlSmgrTruncate::decode(&mut buf);
try_prr!(self.ingest_xlog_smgr_truncate(modification, &truncate));
self.ingest_xlog_smgr_truncate(modification, &truncate)
.await?;
} else if decoded.xl_rmid == pg_constants::RM_DBASE_ID {
debug!(
"handle RM_DBASE_ID for Postgres version {:?}",
@@ -132,14 +127,15 @@ impl<'a> WalIngest<'a> {
let createdb = XlCreateDatabase::decode(&mut buf);
debug!("XLOG_DBASE_CREATE v14");
try_prr!(self.ingest_xlog_dbase_create(modification, &createdb));
self.ingest_xlog_dbase_create(modification, &createdb)
.await?;
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== postgres_ffi::v14::bindings::XLOG_DBASE_DROP
{
let dropdb = XlDropDatabase::decode(&mut buf);
for tablespace_id in dropdb.tablespace_ids {
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
try_prr!(modification.drop_dbdir(tablespace_id, dropdb.db_id));
modification.drop_dbdir(tablespace_id, dropdb.db_id)?;
}
}
} else if self.timeline.pg_version == 15 {
@@ -155,14 +151,15 @@ impl<'a> WalIngest<'a> {
// So we can reuse XlCreateDatabase here.
debug!("XLOG_DBASE_CREATE_FILE_COPY");
let createdb = XlCreateDatabase::decode(&mut buf);
try_prr!(self.ingest_xlog_dbase_create(modification, &createdb));
self.ingest_xlog_dbase_create(modification, &createdb)
.await?;
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== postgres_ffi::v15::bindings::XLOG_DBASE_DROP
{
let dropdb = XlDropDatabase::decode(&mut buf);
for tablespace_id in dropdb.tablespace_ids {
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
try_prr!(modification.drop_dbdir(tablespace_id, dropdb.db_id));
modification.drop_dbdir(tablespace_id, dropdb.db_id)?;
}
}
}
@@ -174,38 +171,42 @@ impl<'a> WalIngest<'a> {
let pageno = buf.get_u32_le();
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
try_prr!(self.put_slru_page_image(
self.put_slru_page_image(
modification,
SlruKind::Clog,
segno,
rpageno,
ZERO_PAGE.clone(),
));
)
.await?;
} else {
assert!(info == pg_constants::CLOG_TRUNCATE);
let xlrec = XlClogTruncate::decode(&mut buf);
try_prr!(self.ingest_clog_truncate_record(modification, &xlrec));
self.ingest_clog_truncate_record(modification, &xlrec)
.await?;
}
} else if decoded.xl_rmid == pg_constants::RM_XACT_ID {
let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT {
let parsed_xact =
XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
try_prr!(self.ingest_xact_record(
self.ingest_xact_record(
modification,
&parsed_xact,
info == pg_constants::XLOG_XACT_COMMIT,
));
)
.await?;
} else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED
|| info == pg_constants::XLOG_XACT_ABORT_PREPARED
{
let parsed_xact =
XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
try_prr!(self.ingest_xact_record(
self.ingest_xact_record(
modification,
&parsed_xact,
info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
));
)
.await?;
// Remove twophase file. see RemoveTwoPhaseFile() in postgres code
trace!(
"Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
@@ -213,10 +214,9 @@ impl<'a> WalIngest<'a> {
parsed_xact.xid,
lsn,
);
try_prr!(modification.drop_twophase_file(parsed_xact.xid));
modification.drop_twophase_file(parsed_xact.xid)?;
} else if info == pg_constants::XLOG_XACT_PREPARE {
try_prr!(modification
.put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..])));
modification.put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]))?;
}
} else if decoded.xl_rmid == pg_constants::RM_MULTIXACT_ID {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
@@ -225,34 +225,36 @@ impl<'a> WalIngest<'a> {
let pageno = buf.get_u32_le();
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
try_prr!(self.put_slru_page_image(
self.put_slru_page_image(
modification,
SlruKind::MultiXactOffsets,
segno,
rpageno,
ZERO_PAGE.clone(),
));
)
.await?;
} else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
let pageno = buf.get_u32_le();
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
try_prr!(self.put_slru_page_image(
self.put_slru_page_image(
modification,
SlruKind::MultiXactMembers,
segno,
rpageno,
ZERO_PAGE.clone(),
));
)
.await?;
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
let xlrec = XlMultiXactCreate::decode(&mut buf);
try_prr!(self.ingest_multixact_create_record(modification, &xlrec));
self.ingest_multixact_create_record(modification, &xlrec)?;
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
let xlrec = XlMultiXactTruncate::decode(&mut buf);
try_prr!(self.ingest_multixact_truncate_record(modification, &xlrec));
self.ingest_multixact_truncate_record(modification, &xlrec)?;
}
} else if decoded.xl_rmid == pg_constants::RM_RELMAP_ID {
let xlrec = XlRelmapUpdate::decode(&mut buf);
try_prr!(self.ingest_relmap_page(modification, &xlrec, decoded));
self.ingest_relmap_page(modification, &xlrec, decoded)?;
} else if decoded.xl_rmid == pg_constants::RM_XLOG_ID {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_NEXTOID {
@@ -266,9 +268,7 @@ impl<'a> WalIngest<'a> {
{
let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT];
buf.copy_to_slice(&mut checkpoint_bytes);
let xlog_checkpoint = try_prr!(
CheckPoint::decode(&checkpoint_bytes).context("deserialize CheckPoint")
);
let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
trace!(
"xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
xlog_checkpoint.oldestXid,
@@ -289,32 +289,32 @@ impl<'a> WalIngest<'a> {
// Iterate through all the blocks that the record modifies, and
// "put" a separate copy of the record for each block.
for blk in decoded.blocks.iter() {
try_no_ondemand_download!(self.ingest_decoded_block(modification, lsn, decoded, blk));
self.ingest_decoded_block(modification, lsn, decoded, blk)
.await?;
}
// If checkpoint data was updated, store the new version in the repository
if self.checkpoint_modified {
let new_checkpoint_bytes =
try_prr!(self.checkpoint.encode().context("encode checkpoint"));
let new_checkpoint_bytes = self.checkpoint.encode()?;
try_prr!(modification.put_checkpoint(new_checkpoint_bytes));
modification.put_checkpoint(new_checkpoint_bytes)?;
self.checkpoint_modified = false;
}
// Now that this record has been fully handled, including updating the
// checkpoint data, let the repository know that it is up-to-date to this LSN
try_prr!(modification.commit());
modification.commit()?;
PageReconstructResult::Success(())
Ok(())
}
fn ingest_decoded_block(
async fn ingest_decoded_block(
&mut self,
modification: &mut DatadirModification,
modification: &mut DatadirModification<'_>,
lsn: Lsn,
decoded: &DecodedWALRecord,
blk: &DecodedBkpBlock,
) -> PageReconstructResult<()> {
) -> Result<(), PageReconstructError> {
let rel = RelTag {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
@@ -334,7 +334,7 @@ impl<'a> WalIngest<'a> {
&& (decoded.xl_info == pg_constants::XLOG_FPI
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
// compression of WAL is not yet supported: fall back to storing the original WAL record
&& !try_prr!(postgres_ffi::bkpimage_is_compressed(blk.bimg_info, self.timeline.pg_version))
&& !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, self.timeline.pg_version)?
{
// Extract page image from FPI record
let img_len = blk.bimg_len as usize;
@@ -356,28 +356,25 @@ impl<'a> WalIngest<'a> {
page_set_lsn(&mut image, lsn)
}
assert_eq!(image.len(), BLCKSZ as usize);
try_no_ondemand_download!(self.put_rel_page_image(
modification,
rel,
blk.blkno,
image.freeze()
));
self.put_rel_page_image(modification, rel, blk.blkno, image.freeze())
.await?;
} else {
let rec = NeonWalRecord::Postgres {
will_init: blk.will_init || blk.apply_image,
rec: decoded.record.clone(),
};
try_prr!(self.put_rel_wal_record(modification, rel, blk.blkno, rec));
self.put_rel_wal_record(modification, rel, blk.blkno, rec)
.await?;
}
PageReconstructResult::Success(())
Ok(())
}
fn ingest_heapam_record(
async fn ingest_heapam_record(
&mut self,
buf: &mut Bytes,
modification: &mut DatadirModification,
modification: &mut DatadirModification<'_>,
decoded: &mut DecodedWALRecord,
) -> Result<()> {
) -> anyhow::Result<()> {
// Handle VM bit updates that are implicitly part of heap records.
// First, look at the record to determine which VM bits need
@@ -456,7 +453,7 @@ impl<'a> WalIngest<'a> {
// replaying it would fail to find the previous image of the page, because
// it doesn't exist. So check if the VM page(s) exist, and skip the WAL
// record if it doesn't.
let vm_size = self.get_relsize(vm_rel, modification.lsn)?;
let vm_size = self.get_relsize(vm_rel, modification.lsn).await?;
if let Some(blknum) = new_vm_blk {
if blknum >= vm_size {
new_vm_blk = None;
@@ -481,7 +478,8 @@ impl<'a> WalIngest<'a> {
old_heap_blkno,
flags: pg_constants::VISIBILITYMAP_VALID_BITS,
},
)?;
)
.await?;
} else {
// Clear VM bits for one heap page, or for two pages that reside on
// different VM pages.
@@ -495,7 +493,8 @@ impl<'a> WalIngest<'a> {
old_heap_blkno: None,
flags: pg_constants::VISIBILITYMAP_VALID_BITS,
},
)?;
)
.await?;
}
if let Some(old_vm_blk) = old_vm_blk {
self.put_rel_wal_record(
@@ -507,7 +506,8 @@ impl<'a> WalIngest<'a> {
old_heap_blkno,
flags: pg_constants::VISIBILITYMAP_VALID_BITS,
},
)?;
)
.await?;
}
}
}
@@ -517,9 +517,9 @@ impl<'a> WalIngest<'a> {
}
/// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
fn ingest_xlog_dbase_create(
async fn ingest_xlog_dbase_create(
&mut self,
modification: &mut DatadirModification,
modification: &mut DatadirModification<'_>,
rec: &XlCreateDatabase,
) -> anyhow::Result<()> {
let db_id = rec.db_id;
@@ -534,18 +534,22 @@ impl<'a> WalIngest<'a> {
// get calls instead.
let req_lsn = modification.tline.get_last_record_lsn();
let rels = modification
.tline
.list_rels(src_tablespace_id, src_db_id, req_lsn)
.no_ondemand_download()?;
let rels = with_ondemand_download(|| {
modification
.tline
.list_rels(src_tablespace_id, src_db_id, req_lsn)
})
.await?;
debug!("ingest_xlog_dbase_create: {} rels", rels.len());
// Copy relfilemap
let filemap = modification
.tline
.get_relmap_file(src_tablespace_id, src_db_id, req_lsn)
.no_ondemand_download()?;
let filemap = with_ondemand_download(|| {
modification
.tline
.get_relmap_file(src_tablespace_id, src_db_id, req_lsn)
})
.await?;
modification.put_relmap_file(tablespace_id, db_id, filemap)?;
let mut num_rels_copied = 0;
@@ -554,10 +558,9 @@ impl<'a> WalIngest<'a> {
assert_eq!(src_rel.spcnode, src_tablespace_id);
assert_eq!(src_rel.dbnode, src_db_id);
let nblocks = modification
.tline
.get_rel_size(src_rel, req_lsn, true)
.no_ondemand_download()?;
let nblocks =
with_ondemand_download(|| modification.tline.get_rel_size(src_rel, req_lsn, true))
.await?;
let dst_rel = RelTag {
spcnode: tablespace_id,
dbnode: db_id,
@@ -572,10 +575,12 @@ impl<'a> WalIngest<'a> {
for blknum in 0..nblocks {
debug!("copying block {} from {} to {}", blknum, src_rel, dst_rel);
let content = modification
.tline
.get_rel_page_at_lsn(src_rel, blknum, req_lsn, true)
.no_ondemand_download()?;
let content = with_ondemand_download(|| {
modification
.tline
.get_rel_page_at_lsn(src_rel, blknum, req_lsn, true)
})
.await?;
modification.put_rel_page_image(dst_rel, blknum, content)?;
num_blocks_copied += 1;
}
@@ -594,7 +599,7 @@ impl<'a> WalIngest<'a> {
&mut self,
modification: &mut DatadirModification,
rec: &XlSmgrCreate,
) -> Result<()> {
) -> anyhow::Result<()> {
let rel = RelTag {
spcnode: rec.rnode.spcnode,
dbnode: rec.rnode.dbnode,
@@ -608,11 +613,11 @@ impl<'a> WalIngest<'a> {
/// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record.
///
/// This is the same logic as in PostgreSQL's smgr_redo() function.
fn ingest_xlog_smgr_truncate(
async fn ingest_xlog_smgr_truncate(
&mut self,
modification: &mut DatadirModification,
modification: &mut DatadirModification<'_>,
rec: &XlSmgrTruncate,
) -> Result<()> {
) -> anyhow::Result<()> {
let spcnode = rec.rnode.spcnode;
let dbnode = rec.rnode.dbnode;
let relnode = rec.rnode.relnode;
@@ -642,7 +647,7 @@ impl<'a> WalIngest<'a> {
modification.put_rel_page_image(rel, fsm_physical_page_no, ZERO_PAGE.clone())?;
fsm_physical_page_no += 1;
}
let nblocks = self.get_relsize(rel, modification.lsn)?;
let nblocks = self.get_relsize(rel, modification.lsn).await?;
if nblocks > fsm_physical_page_no {
// check if something to do: FSM is larger than truncate position
self.put_rel_truncation(modification, rel, fsm_physical_page_no)?;
@@ -663,7 +668,7 @@ impl<'a> WalIngest<'a> {
modification.put_rel_page_image(rel, vm_page_no, ZERO_PAGE.clone())?;
vm_page_no += 1;
}
let nblocks = self.get_relsize(rel, modification.lsn)?;
let nblocks = self.get_relsize(rel, modification.lsn).await?;
if nblocks > vm_page_no {
// check if something to do: VM is larger than truncate position
self.put_rel_truncation(modification, rel, vm_page_no)?;
@@ -674,9 +679,9 @@ impl<'a> WalIngest<'a> {
/// Subroutine of ingest_record(), to handle an XLOG_XACT_* records.
///
fn ingest_xact_record(
async fn ingest_xact_record(
&mut self,
modification: &mut DatadirModification,
modification: &mut DatadirModification<'_>,
parsed: &XlXactParsedRecord,
is_commit: bool,
) -> anyhow::Result<()> {
@@ -735,10 +740,8 @@ impl<'a> WalIngest<'a> {
relnode: xnode.relnode,
};
let last_lsn = self.timeline.get_last_record_lsn();
if modification
.tline
.get_rel_exists(rel, last_lsn, true)
.no_ondemand_download()?
if with_ondemand_download(|| modification.tline.get_rel_exists(rel, last_lsn, true))
.await?
{
self.put_rel_drop(modification, rel)?;
}
@@ -747,9 +750,9 @@ impl<'a> WalIngest<'a> {
Ok(())
}
fn ingest_clog_truncate_record(
async fn ingest_clog_truncate_record(
&mut self,
modification: &mut DatadirModification,
modification: &mut DatadirModification<'_>,
xlrec: &XlClogTruncate,
) -> anyhow::Result<()> {
info!(
@@ -791,11 +794,14 @@ impl<'a> WalIngest<'a> {
// it. So we use the previous record's LSN in the get calls
// instead.
let req_lsn = modification.tline.get_last_record_lsn();
for segno in modification
.tline
.list_slru_segments(SlruKind::Clog, req_lsn)
.no_ondemand_download()?
{
let slru_segments = with_ondemand_download(|| {
modification
.tline
.list_slru_segments(SlruKind::Clog, req_lsn)
})
.await?;
for segno in slru_segments {
let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
if slru_may_delete_clogsegment(segpage, xlrec.pageno) {
modification.drop_slru_segment(SlruKind::Clog, segno)?;
@@ -944,27 +950,26 @@ impl<'a> WalIngest<'a> {
Ok(())
}
fn put_rel_page_image(
async fn put_rel_page_image(
&mut self,
modification: &mut DatadirModification,
modification: &mut DatadirModification<'_>,
rel: RelTag,
blknum: BlockNumber,
img: Bytes,
) -> PageReconstructResult<()> {
try_no_ondemand_download!(self.handle_rel_extend(modification, rel, blknum));
try_prr!(modification.put_rel_page_image(rel, blknum, img));
PageReconstructResult::Success(())
) -> anyhow::Result<()> {
self.handle_rel_extend(modification, rel, blknum).await?;
modification.put_rel_page_image(rel, blknum, img)?;
Ok(())
}
fn put_rel_wal_record(
async fn put_rel_wal_record(
&mut self,
modification: &mut DatadirModification,
modification: &mut DatadirModification<'_>,
rel: RelTag,
blknum: BlockNumber,
rec: NeonWalRecord,
) -> Result<()> {
self.handle_rel_extend(modification, rel, blknum)
.no_ondemand_download()?;
) -> anyhow::Result<()> {
self.handle_rel_extend(modification, rel, blknum).await?;
modification.put_rel_wal_record(rel, blknum, rec)?;
Ok(())
}
@@ -984,69 +989,67 @@ impl<'a> WalIngest<'a> {
Ok(())
}
fn get_relsize(&mut self, rel: RelTag, lsn: Lsn) -> anyhow::Result<BlockNumber> {
let nblocks = if !self
.timeline
.get_rel_exists(rel, lsn, true)
.no_ondemand_download()?
{
async fn get_relsize(&mut self, rel: RelTag, lsn: Lsn) -> anyhow::Result<BlockNumber> {
let exists =
with_ondemand_download(|| self.timeline.get_rel_exists(rel, lsn, true)).await?;
let nblocks = if !exists {
0
} else {
self.timeline
.get_rel_size(rel, lsn, true)
.no_ondemand_download()?
with_ondemand_download(|| self.timeline.get_rel_size(rel, lsn, true)).await?
};
Ok(nblocks)
}
fn handle_rel_extend(
async fn handle_rel_extend(
&mut self,
modification: &mut DatadirModification,
modification: &mut DatadirModification<'_>,
rel: RelTag,
blknum: BlockNumber,
) -> PageReconstructResult<()> {
) -> anyhow::Result<()> {
let new_nblocks = blknum + 1;
// Check if the relation exists. We implicitly create relations on first
// record.
// TODO: would be nice if to be more explicit about it
let last_lsn = modification.lsn;
let old_nblocks =
if !try_no_ondemand_download!(self.timeline.get_rel_exists(rel, last_lsn, true)) {
if !with_ondemand_download(|| self.timeline.get_rel_exists(rel, last_lsn, true)).await?
{
// create it with 0 size initially, the logic below will extend it
try_prr!(modification.put_rel_creation(rel, 0));
modification.put_rel_creation(rel, 0)?;
0
} else {
try_no_ondemand_download!(self.timeline.get_rel_size(rel, last_lsn, true))
with_ondemand_download(|| self.timeline.get_rel_size(rel, last_lsn, true)).await?
};
if new_nblocks > old_nblocks {
//info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
try_prr!(modification.put_rel_extend(rel, new_nblocks));
modification.put_rel_extend(rel, new_nblocks)?;
// fill the gap with zeros
for gap_blknum in old_nblocks..blknum {
try_prr!(modification.put_rel_page_image(rel, gap_blknum, ZERO_PAGE.clone()));
modification.put_rel_page_image(rel, gap_blknum, ZERO_PAGE.clone())?;
}
}
PageReconstructResult::Success(())
Ok(())
}
fn put_slru_page_image(
async fn put_slru_page_image(
&mut self,
modification: &mut DatadirModification,
modification: &mut DatadirModification<'_>,
kind: SlruKind,
segno: u32,
blknum: BlockNumber,
img: Bytes,
) -> Result<()> {
self.handle_slru_extend(modification, kind, segno, blknum)?;
) -> anyhow::Result<()> {
self.handle_slru_extend(modification, kind, segno, blknum)
.await?;
modification.put_slru_page_image(kind, segno, blknum, img)?;
Ok(())
}
fn handle_slru_extend(
async fn handle_slru_extend(
&mut self,
modification: &mut DatadirModification,
modification: &mut DatadirModification<'_>,
kind: SlruKind,
segno: u32,
blknum: BlockNumber,
@@ -1060,18 +1063,17 @@ impl<'a> WalIngest<'a> {
// record.
// TODO: would be nice if to be more explicit about it
let last_lsn = self.timeline.get_last_record_lsn();
let old_nblocks = if !self
.timeline
.get_slru_segment_exists(kind, segno, last_lsn)
.no_ondemand_download()?
let old_nblocks = if !with_ondemand_download(|| {
self.timeline.get_slru_segment_exists(kind, segno, last_lsn)
})
.await?
{
// create it with 0 size initially, the logic below will extend it
modification.put_slru_segment_creation(kind, segno, 0)?;
0
} else {
self.timeline
.get_slru_segment_size(kind, segno, last_lsn)
.no_ondemand_download()?
with_ondemand_download(|| self.timeline.get_slru_segment_size(kind, segno, last_lsn))
.await?
};
if new_nblocks > old_nblocks {
@@ -1119,12 +1121,12 @@ mod tests {
static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
fn init_walingest_test(tline: &Timeline) -> Result<WalIngest> {
async fn init_walingest_test(tline: &Timeline) -> Result<WalIngest> {
let mut m = tline.begin_modification(Lsn(0x10));
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
m.put_relmap_file(0, 111, Bytes::from(""))?; // dummy relmapper file
m.commit()?;
let walingest = WalIngest::new(tline, Lsn(0x10)).no_ondemand_download()?;
let walingest = WalIngest::new(tline, Lsn(0x10)).await?;
Ok(walingest)
}
@@ -1133,28 +1135,28 @@ mod tests {
async fn test_relsize() -> Result<()> {
let tenant = TenantHarness::create("test_relsize")?.load().await;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?;
let mut walingest = init_walingest_test(&tline)?;
let mut walingest = init_walingest_test(&tline).await?;
let mut m = tline.begin_modification(Lsn(0x20));
walingest.put_rel_creation(&mut m, TESTREL_A)?;
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))
.no_ondemand_download()?;
.await?;
m.commit()?;
let mut m = tline.begin_modification(Lsn(0x30));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"))
.no_ondemand_download()?;
.await?;
m.commit()?;
let mut m = tline.begin_modification(Lsn(0x40));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"))
.no_ondemand_download()?;
.await?;
m.commit()?;
let mut m = tline.begin_modification(Lsn(0x50));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"))
.no_ondemand_download()?;
.await?;
m.commit()?;
assert_current_logical_size(&tline, Lsn(0x50));
@@ -1292,7 +1294,7 @@ mod tests {
let mut m = tline.begin_modification(Lsn(0x70));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"))
.no_ondemand_download()?;
.await?;
m.commit()?;
assert_eq!(
tline
@@ -1317,7 +1319,7 @@ mod tests {
let mut m = tline.begin_modification(Lsn(0x80));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"))
.no_ondemand_download()?;
.await?;
m.commit()?;
assert_eq!(
tline
@@ -1349,12 +1351,12 @@ mod tests {
async fn test_drop_extend() -> Result<()> {
let tenant = TenantHarness::create("test_drop_extend")?.load().await;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?;
let mut walingest = init_walingest_test(&tline)?;
let mut walingest = init_walingest_test(&tline).await?;
let mut m = tline.begin_modification(Lsn(0x20));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))
.no_ondemand_download()?;
.await?;
m.commit()?;
// Check that rel exists and size is correct
@@ -1391,7 +1393,7 @@ mod tests {
let mut m = tline.begin_modification(Lsn(0x40));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"))
.no_ondemand_download()?;
.await?;
m.commit()?;
// Check that rel exists and size is correct
@@ -1418,7 +1420,7 @@ mod tests {
async fn test_truncate_extend() -> Result<()> {
let tenant = TenantHarness::create("test_truncate_extend")?.load().await;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?;
let mut walingest = init_walingest_test(&tline)?;
let mut walingest = init_walingest_test(&tline).await?;
// Create a 20 MB relation (the size is arbitrary)
let relsize = 20 * 1024 * 1024 / 8192;
@@ -1427,7 +1429,7 @@ mod tests {
let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
walingest
.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data))
.no_ondemand_download()?;
.await?;
}
m.commit()?;
@@ -1519,7 +1521,7 @@ mod tests {
let data = format!("foo blk {} at {}", blkno, lsn);
walingest
.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data))
.no_ondemand_download()?;
.await?;
}
m.commit()?;
@@ -1556,7 +1558,7 @@ mod tests {
async fn test_large_rel() -> Result<()> {
let tenant = TenantHarness::create("test_large_rel")?.load().await;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?;
let mut walingest = init_walingest_test(&tline)?;
let mut walingest = init_walingest_test(&tline).await?;
let mut lsn = 0x10;
for blknum in 0..RELSEG_SIZE + 1 {
@@ -1565,7 +1567,7 @@ mod tests {
let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
walingest
.put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img)
.no_ondemand_download()?;
.await?;
m.commit()?;
}

View File

@@ -20,9 +20,7 @@ use tokio::{pin, select, sync::watch, time};
use tokio_postgres::{replication::ReplicationStream, Client};
use tracing::{debug, error, info, trace, warn};
use crate::{
metrics::LIVE_CONNECTIONS_COUNT, tenant::with_ondemand_download, walreceiver::TaskStateUpdate,
};
use crate::{metrics::LIVE_CONNECTIONS_COUNT, walreceiver::TaskStateUpdate};
use crate::{
task_mgr,
task_mgr::TaskKind,
@@ -175,8 +173,7 @@ pub async fn handle_walreceiver_connection(
let mut waldecoder = WalStreamDecoder::new(startpoint, timeline.pg_version);
let mut walingest =
with_ondemand_download(|| WalIngest::new(timeline.as_ref(), startpoint)).await?;
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint).await?;
while let Some(replication_message) = {
select! {
@@ -251,16 +248,10 @@ pub async fn handle_walreceiver_connection(
// at risk of hitting a deadlock.
ensure!(lsn.is_aligned());
with_ondemand_download(|| {
walingest.ingest_record(
recdata.clone(),
lsn,
&mut modification,
&mut decoded,
)
})
.await
.with_context(|| format!("could not ingest record at {lsn}"))?;
walingest
.ingest_record(recdata.clone(), lsn, &mut modification, &mut decoded)
.await
.with_context(|| format!("could not ingest record at {lsn}"))?;
fail_point!("walreceiver-after-ingest");