Push on-demand download into Timeline::get() function itself.

This makes Timeline::get() async, and all functions that call it
directly or indirectly with it. The with_ondemand_download() mechanism
is gone, Timeline::get() now always downloads files, whether you want
it or not. That is what all the current callers want, so even though
this loses the capability to get a page only if it's already in the
pageserver, without downloading, we were not using that capability.
There were some places that used 'no_ondemand_download' in the WAL
ingestion code that would error out if a layer file was not found
locally, but those were dubious. We do actually want to on-demand
download in all of those places.

Per discussion at
https://github.com/neondatabase/neon/pull/3233#issuecomment-1368032358
This commit is contained in:
Heikki Linnakangas
2023-01-11 22:24:02 +02:00
committed by Heikki Linnakangas
parent 95bf19b85a
commit c1731bc4f0
10 changed files with 489 additions and 693 deletions

View File

@@ -27,7 +27,7 @@ use tracing::*;
///
use tokio_tar::{Builder, EntryType, Header};
use crate::tenant::{with_ondemand_download, Timeline};
use crate::tenant::Timeline;
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
@@ -171,30 +171,23 @@ where
SlruKind::MultiXactOffsets,
SlruKind::MultiXactMembers,
] {
for segno in
with_ondemand_download(|| self.timeline.list_slru_segments(kind, self.lsn)).await?
{
for segno in self.timeline.list_slru_segments(kind, self.lsn).await? {
self.add_slru_segment(kind, segno).await?;
}
}
// Create tablespace directories
for ((spcnode, dbnode), has_relmap_file) in
with_ondemand_download(|| self.timeline.list_dbdirs(self.lsn)).await?
{
for ((spcnode, dbnode), has_relmap_file) in self.timeline.list_dbdirs(self.lsn).await? {
self.add_dbdir(spcnode, dbnode, has_relmap_file).await?;
// Gather and send relational files in each database if full backup is requested.
if self.full_backup {
for rel in
with_ondemand_download(|| self.timeline.list_rels(spcnode, dbnode, self.lsn))
.await?
{
for rel in self.timeline.list_rels(spcnode, dbnode, self.lsn).await? {
self.add_rel(rel).await?;
}
}
}
for xid in with_ondemand_download(|| self.timeline.list_twophase_files(self.lsn)).await? {
for xid in self.timeline.list_twophase_files(self.lsn).await? {
self.add_twophase_file(xid).await?;
}
@@ -210,8 +203,7 @@ where
}
async fn add_rel(&mut self, tag: RelTag) -> anyhow::Result<()> {
let nblocks =
with_ondemand_download(|| self.timeline.get_rel_size(tag, self.lsn, false)).await?;
let nblocks = self.timeline.get_rel_size(tag, self.lsn, false).await?;
// If the relation is empty, create an empty file
if nblocks == 0 {
@@ -229,11 +221,10 @@ where
let mut segment_data: Vec<u8> = vec![];
for blknum in startblk..endblk {
let img = with_ondemand_download(|| {
self.timeline
.get_rel_page_at_lsn(tag, blknum, self.lsn, false)
})
.await?;
let img = self
.timeline
.get_rel_page_at_lsn(tag, blknum, self.lsn, false)
.await?;
segment_data.extend_from_slice(&img[..]);
}
@@ -252,17 +243,17 @@ where
// Generate SLRU segment files from repository.
//
async fn add_slru_segment(&mut self, slru: SlruKind, segno: u32) -> anyhow::Result<()> {
let nblocks =
with_ondemand_download(|| self.timeline.get_slru_segment_size(slru, segno, self.lsn))
.await?;
let nblocks = self
.timeline
.get_slru_segment_size(slru, segno, self.lsn)
.await?;
let mut slru_buf: Vec<u8> = Vec::with_capacity(nblocks as usize * BLCKSZ as usize);
for blknum in 0..nblocks {
let img = with_ondemand_download(|| {
self.timeline
.get_slru_page_at_lsn(slru, segno, blknum, self.lsn)
})
.await?;
let img = self
.timeline
.get_slru_page_at_lsn(slru, segno, blknum, self.lsn)
.await?;
if slru == SlruKind::Clog {
ensure!(img.len() == BLCKSZ as usize || img.len() == BLCKSZ as usize + 8);
@@ -294,9 +285,10 @@ where
has_relmap_file: bool,
) -> anyhow::Result<()> {
let relmap_img = if has_relmap_file {
let img =
with_ondemand_download(|| self.timeline.get_relmap_file(spcnode, dbnode, self.lsn))
.await?;
let img = self
.timeline
.get_relmap_file(spcnode, dbnode, self.lsn)
.await?;
ensure!(img.len() == 512);
Some(img)
} else {
@@ -329,7 +321,9 @@ where
// XLOG_TBLSPC_DROP records. But we probably should just
// throw an error on CREATE TABLESPACE in the first place.
if !has_relmap_file
&& with_ondemand_download(|| self.timeline.list_rels(spcnode, dbnode, self.lsn))
&& self
.timeline
.list_rels(spcnode, dbnode, self.lsn)
.await?
.is_empty()
{
@@ -362,7 +356,7 @@ where
// Extract twophase state files
//
async fn add_twophase_file(&mut self, xid: TransactionId) -> anyhow::Result<()> {
let img = with_ondemand_download(|| self.timeline.get_twophase_file(xid, self.lsn)).await?;
let img = self.timeline.get_twophase_file(xid, self.lsn).await?;
let mut buf = BytesMut::new();
buf.extend_from_slice(&img[..]);
@@ -398,10 +392,14 @@ where
)
.await?;
let checkpoint_bytes = with_ondemand_download(|| self.timeline.get_checkpoint(self.lsn))
let checkpoint_bytes = self
.timeline
.get_checkpoint(self.lsn)
.await
.context("failed to get checkpoint bytes")?;
let pg_control_bytes = with_ondemand_download(|| self.timeline.get_control_file(self.lsn))
let pg_control_bytes = self
.timeline
.get_control_file(self.lsn)
.await
.context("failed get control bytes")?;

View File

@@ -13,7 +13,7 @@ use super::models::{
};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::tenant::config::TenantConfOpt;
use crate::tenant::{with_ondemand_download, Timeline};
use crate::tenant::{PageReconstructError, Timeline};
use crate::{config::PageServerConf, tenant::mgr};
use utils::{
auth::JwtAuth,
@@ -77,6 +77,15 @@ fn check_permission(request: &Request<Body>, tenant_id: Option<TenantId>) -> Res
})
}
fn apierror_from_prerror(err: PageReconstructError) -> ApiError {
match err {
PageReconstructError::Other(err) => ApiError::InternalServerError(err),
PageReconstructError::WalRedo(err) => {
ApiError::InternalServerError(anyhow::Error::new(err))
}
}
}
// Helper function to construct a TimelineInfo struct for a timeline
async fn build_timeline_info(
timeline: &Arc<Timeline>,
@@ -298,9 +307,10 @@ async fn get_lsn_by_timestamp_handler(request: Request<Body>) -> Result<Response
.await
.and_then(|tenant| tenant.get_timeline(timeline_id, true))
.map_err(ApiError::NotFound)?;
let result = with_ondemand_download(|| timeline.find_lsn_for_timestamp(timestamp_pg))
let result = timeline
.find_lsn_for_timestamp(timestamp_pg)
.await
.map_err(ApiError::InternalServerError)?;
.map_err(apierror_from_prerror)?;
let result = match result {
LsnForTimestamp::Present(lsn) => format!("{lsn}"),

View File

@@ -143,7 +143,7 @@ async fn import_rel(
// Call put_rel_creation for every segment of the relation,
// because there is no guarantee about the order in which we are processing segments.
// ignore "relation already exists" error
if let Err(e) = modification.put_rel_creation(rel, nblocks as u32) {
if let Err(e) = modification.put_rel_creation(rel, nblocks as u32).await {
if e.to_string().contains("already exists") {
debug!("relation {} already exists. we must be extending it", rel);
} else {
@@ -178,7 +178,7 @@ async fn import_rel(
//
// If we process rel segments out of order,
// put_rel_extend will skip the update.
modification.put_rel_extend(rel, blknum)?;
modification.put_rel_extend(rel, blknum).await?;
Ok(())
}
@@ -206,7 +206,9 @@ async fn import_slru(
ensure!(nblocks <= pg_constants::SLRU_PAGES_PER_SEGMENT as usize);
modification.put_slru_segment_creation(slru, segno, nblocks as u32)?;
modification
.put_slru_segment_creation(slru, segno, nblocks as u32)
.await?;
let mut rpageno = 0;
loop {
@@ -492,7 +494,7 @@ async fn import_file(
}
"pg_filenode.map" => {
let bytes = read_all_bytes(reader).await?;
modification.put_relmap_file(spcnode, dbnode, bytes)?;
modification.put_relmap_file(spcnode, dbnode, bytes).await?;
debug!("imported relmap file")
}
"PG_VERSION" => {
@@ -515,7 +517,7 @@ async fn import_file(
match file_name.as_ref() {
"pg_filenode.map" => {
let bytes = read_all_bytes(reader).await?;
modification.put_relmap_file(spcnode, dbnode, bytes)?;
modification.put_relmap_file(spcnode, dbnode, bytes).await?;
debug!("imported relmap file")
}
"PG_VERSION" => {
@@ -545,7 +547,9 @@ async fn import_file(
let xid = u32::from_str_radix(file_name.as_ref(), 16)?;
let bytes = read_all_bytes(reader).await?;
modification.put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]))?;
modification
.put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]))
.await?;
debug!("imported twophase file");
} else if file_path.starts_with("pg_wal") {
debug!("found wal file in base section. ignore it");

View File

@@ -546,10 +546,7 @@ impl PageServerHandler {
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)
.await?;
let exists = crate::tenant::with_ondemand_download(|| {
timeline.get_rel_exists(req.rel, lsn, req.latest)
})
.await?;
let exists = timeline.get_rel_exists(req.rel, lsn, req.latest).await?;
Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
exists,
@@ -566,10 +563,7 @@ impl PageServerHandler {
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)
.await?;
let n_blocks = crate::tenant::with_ondemand_download(|| {
timeline.get_rel_size(req.rel, lsn, req.latest)
})
.await?;
let n_blocks = timeline.get_rel_size(req.rel, lsn, req.latest).await?;
Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
n_blocks,
@@ -586,10 +580,9 @@ impl PageServerHandler {
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)
.await?;
let total_blocks = crate::tenant::with_ondemand_download(|| {
timeline.get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, lsn, req.latest)
})
.await?;
let total_blocks = timeline
.get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, lsn, req.latest)
.await?;
let db_size = total_blocks as i64 * BLCKSZ as i64;
Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
@@ -615,10 +608,9 @@ impl PageServerHandler {
}
*/
let page = crate::tenant::with_ondemand_download(|| {
timeline.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest)
})
.await?;
let page = timeline
.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest)
.await?;
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
page,

View File

@@ -6,11 +6,10 @@
//! walingest.rs handles a few things like implicit relation creation and extension.
//! Clarify that)
//!
use super::tenant::PageReconstructResult;
use super::tenant::{PageReconstructError, Timeline};
use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::tenant::{with_ondemand_download, Timeline};
use crate::repository::*;
use crate::walrecord::NeonWalRecord;
use crate::{repository::*, try_no_ondemand_download};
use anyhow::Context;
use bytes::{Buf, Bytes};
use pageserver_api::reltag::{RelTag, SlruKind};
@@ -92,76 +91,80 @@ impl Timeline {
//------------------------------------------------------------------------------
/// Look up given page version.
pub fn get_rel_page_at_lsn(
pub async fn get_rel_page_at_lsn(
&self,
tag: RelTag,
blknum: BlockNumber,
lsn: Lsn,
latest: bool,
) -> PageReconstructResult<Bytes> {
) -> Result<Bytes, PageReconstructError> {
if tag.relnode == 0 {
return PageReconstructResult::from(anyhow::anyhow!("invalid relnode"));
return Err(PageReconstructError::Other(anyhow::anyhow!(
"invalid relnode"
)));
}
let nblocks = try_no_ondemand_download!(self.get_rel_size(tag, lsn, latest));
let nblocks = self.get_rel_size(tag, lsn, latest).await?;
if blknum >= nblocks {
debug!(
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
tag, blknum, lsn, nblocks
);
return PageReconstructResult::Success(ZERO_PAGE.clone());
return Ok(ZERO_PAGE.clone());
}
let key = rel_block_to_key(tag, blknum);
self.get(key, lsn)
self.get(key, lsn).await
}
// Get size of a database in blocks
pub fn get_db_size(
pub async fn get_db_size(
&self,
spcnode: Oid,
dbnode: Oid,
lsn: Lsn,
latest: bool,
) -> PageReconstructResult<usize> {
) -> Result<usize, PageReconstructError> {
let mut total_blocks = 0;
let rels = try_no_ondemand_download!(self.list_rels(spcnode, dbnode, lsn));
let rels = self.list_rels(spcnode, dbnode, lsn).await?;
for rel in rels {
let n_blocks = try_no_ondemand_download!(self.get_rel_size(rel, lsn, latest));
let n_blocks = self.get_rel_size(rel, lsn, latest).await?;
total_blocks += n_blocks as usize;
}
PageReconstructResult::Success(total_blocks)
Ok(total_blocks)
}
/// Get size of a relation file
pub fn get_rel_size(
pub async fn get_rel_size(
&self,
tag: RelTag,
lsn: Lsn,
latest: bool,
) -> PageReconstructResult<BlockNumber> {
) -> Result<BlockNumber, PageReconstructError> {
if tag.relnode == 0 {
return PageReconstructResult::from(anyhow::anyhow!("invalid relnode"));
return Err(PageReconstructError::Other(anyhow::anyhow!(
"invalid relnode"
)));
}
if let Some(nblocks) = self.get_cached_rel_size(&tag, lsn) {
return PageReconstructResult::Success(nblocks);
return Ok(nblocks);
}
if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
&& !try_no_ondemand_download!(self.get_rel_exists(tag, lsn, latest))
&& !self.get_rel_exists(tag, lsn, latest).await?
{
// FIXME: Postgres sometimes calls smgrcreate() to create
// FSM, and smgrnblocks() on it immediately afterwards,
// without extending it. Tolerate that by claiming that
// any non-existent FSM fork has size 0.
return PageReconstructResult::Success(0);
return Ok(0);
}
let key = rel_size_to_key(tag);
let mut buf = try_no_ondemand_download!(self.get(key, lsn));
let mut buf = self.get(key, lsn).await?;
let nblocks = buf.get_u32_le();
if latest {
@@ -174,47 +177,49 @@ impl Timeline {
// associated with most recent value of LSN.
self.update_cached_rel_size(tag, lsn, nblocks);
}
PageReconstructResult::Success(nblocks)
Ok(nblocks)
}
/// Does relation exist?
pub fn get_rel_exists(
pub async fn get_rel_exists(
&self,
tag: RelTag,
lsn: Lsn,
_latest: bool,
) -> PageReconstructResult<bool> {
) -> Result<bool, PageReconstructError> {
if tag.relnode == 0 {
return PageReconstructResult::from(anyhow::anyhow!("invalid relnode"));
return Err(PageReconstructError::Other(anyhow::anyhow!(
"invalid relnode"
)));
}
// first try to lookup relation in cache
if let Some(_nblocks) = self.get_cached_rel_size(&tag, lsn) {
return PageReconstructResult::Success(true);
return Ok(true);
}
// fetch directory listing
let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
let buf = try_no_ondemand_download!(self.get(key, lsn));
let buf = self.get(key, lsn).await?;
match RelDirectory::des(&buf).context("deserialization failure") {
Ok(dir) => {
let exists = dir.rels.get(&(tag.relnode, tag.forknum)).is_some();
PageReconstructResult::Success(exists)
Ok(exists)
}
Err(e) => PageReconstructResult::from(e),
Err(e) => Err(PageReconstructError::from(e)),
}
}
/// Get a list of all existing relations in given tablespace and database.
pub fn list_rels(
pub async fn list_rels(
&self,
spcnode: Oid,
dbnode: Oid,
lsn: Lsn,
) -> PageReconstructResult<HashSet<RelTag>> {
) -> Result<HashSet<RelTag>, PageReconstructError> {
// fetch directory listing
let key = rel_dir_to_key(spcnode, dbnode);
let buf = try_no_ondemand_download!(self.get(key, lsn));
let buf = self.get(key, lsn).await?;
match RelDirectory::des(&buf).context("deserialization failure") {
Ok(dir) => {
@@ -226,53 +231,53 @@ impl Timeline {
forknum: *forknum,
}));
PageReconstructResult::Success(rels)
Ok(rels)
}
Err(e) => PageReconstructResult::from(e),
Err(e) => Err(PageReconstructError::from(e)),
}
}
/// Look up given SLRU page version.
pub fn get_slru_page_at_lsn(
pub async fn get_slru_page_at_lsn(
&self,
kind: SlruKind,
segno: u32,
blknum: BlockNumber,
lsn: Lsn,
) -> PageReconstructResult<Bytes> {
) -> Result<Bytes, PageReconstructError> {
let key = slru_block_to_key(kind, segno, blknum);
self.get(key, lsn)
self.get(key, lsn).await
}
/// Get size of an SLRU segment
pub fn get_slru_segment_size(
pub async fn get_slru_segment_size(
&self,
kind: SlruKind,
segno: u32,
lsn: Lsn,
) -> PageReconstructResult<BlockNumber> {
) -> Result<BlockNumber, PageReconstructError> {
let key = slru_segment_size_to_key(kind, segno);
let mut buf = try_no_ondemand_download!(self.get(key, lsn));
PageReconstructResult::Success(buf.get_u32_le())
let mut buf = self.get(key, lsn).await?;
Ok(buf.get_u32_le())
}
/// Get size of an SLRU segment
pub fn get_slru_segment_exists(
pub async fn get_slru_segment_exists(
&self,
kind: SlruKind,
segno: u32,
lsn: Lsn,
) -> PageReconstructResult<bool> {
) -> Result<bool, PageReconstructError> {
// fetch directory listing
let key = slru_dir_to_key(kind);
let buf = try_no_ondemand_download!(self.get(key, lsn));
let buf = self.get(key, lsn).await?;
match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
Ok(dir) => {
let exists = dir.segments.get(&segno).is_some();
PageReconstructResult::Success(exists)
Ok(exists)
}
Err(e) => PageReconstructResult::from(e),
Err(e) => Err(PageReconstructError::from(e)),
}
}
@@ -283,10 +288,10 @@ impl Timeline {
/// so it's not well defined which LSN you get if there were multiple commits
/// "in flight" at that point in time.
///
pub fn find_lsn_for_timestamp(
pub async fn find_lsn_for_timestamp(
&self,
search_timestamp: TimestampTz,
) -> PageReconstructResult<LsnForTimestamp> {
) -> Result<LsnForTimestamp, PageReconstructError> {
let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
let min_lsn = *gc_cutoff_lsn_guard;
let max_lsn = self.get_last_record_lsn();
@@ -302,12 +307,14 @@ impl Timeline {
// cannot overflow, high and low are both smaller than u64::MAX / 2
let mid = (high + low) / 2;
let cmp = try_no_ondemand_download!(self.is_latest_commit_timestamp_ge_than(
search_timestamp,
Lsn(mid * 8),
&mut found_smaller,
&mut found_larger,
));
let cmp = self
.is_latest_commit_timestamp_ge_than(
search_timestamp,
Lsn(mid * 8),
&mut found_smaller,
&mut found_larger,
)
.await?;
if cmp {
high = mid;
@@ -319,15 +326,15 @@ impl Timeline {
(false, false) => {
// This can happen if no commit records have been processed yet, e.g.
// just after importing a cluster.
PageReconstructResult::Success(LsnForTimestamp::NoData(max_lsn))
Ok(LsnForTimestamp::NoData(max_lsn))
}
(true, false) => {
// Didn't find any commit timestamps larger than the request
PageReconstructResult::Success(LsnForTimestamp::Future(max_lsn))
Ok(LsnForTimestamp::Future(max_lsn))
}
(false, true) => {
// Didn't find any commit timestamps smaller than the request
PageReconstructResult::Success(LsnForTimestamp::Past(max_lsn))
Ok(LsnForTimestamp::Past(max_lsn))
}
(true, true) => {
// low is the LSN of the first commit record *after* the search_timestamp,
@@ -337,7 +344,7 @@ impl Timeline {
// Otherwise, if you restore to the returned LSN, the database will
// include physical changes from later commits that will be marked
// as aborted, and will need to be vacuumed away.
PageReconstructResult::Success(LsnForTimestamp::Present(Lsn((low - 1) * 8)))
Ok(LsnForTimestamp::Present(Lsn((low - 1) * 8)))
}
}
}
@@ -349,26 +356,21 @@ impl Timeline {
/// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
/// with a smaller/larger timestamp.
///
pub fn is_latest_commit_timestamp_ge_than(
pub async fn is_latest_commit_timestamp_ge_than(
&self,
search_timestamp: TimestampTz,
probe_lsn: Lsn,
found_smaller: &mut bool,
found_larger: &mut bool,
) -> PageReconstructResult<bool> {
for segno in try_no_ondemand_download!(self.list_slru_segments(SlruKind::Clog, probe_lsn)) {
let nblocks = try_no_ondemand_download!(self.get_slru_segment_size(
SlruKind::Clog,
segno,
probe_lsn
));
) -> Result<bool, PageReconstructError> {
for segno in self.list_slru_segments(SlruKind::Clog, probe_lsn).await? {
let nblocks = self
.get_slru_segment_size(SlruKind::Clog, segno, probe_lsn)
.await?;
for blknum in (0..nblocks).rev() {
let clog_page = try_no_ondemand_download!(self.get_slru_page_at_lsn(
SlruKind::Clog,
segno,
blknum,
probe_lsn
));
let clog_page = self
.get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn)
.await?;
if clog_page.len() == BLCKSZ as usize + 8 {
let mut timestamp_bytes = [0u8; 8];
@@ -377,76 +379,85 @@ impl Timeline {
if timestamp >= search_timestamp {
*found_larger = true;
return PageReconstructResult::Success(true);
return Ok(true);
} else {
*found_smaller = true;
}
}
}
}
PageReconstructResult::Success(false)
Ok(false)
}
/// Get a list of SLRU segments
pub fn list_slru_segments(
pub async fn list_slru_segments(
&self,
kind: SlruKind,
lsn: Lsn,
) -> PageReconstructResult<HashSet<u32>> {
) -> Result<HashSet<u32>, PageReconstructError> {
// fetch directory entry
let key = slru_dir_to_key(kind);
let buf = try_no_ondemand_download!(self.get(key, lsn));
let buf = self.get(key, lsn).await?;
match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
Ok(dir) => PageReconstructResult::Success(dir.segments),
Err(e) => PageReconstructResult::from(e),
Ok(dir) => Ok(dir.segments),
Err(e) => Err(PageReconstructError::from(e)),
}
}
pub fn get_relmap_file(
pub async fn get_relmap_file(
&self,
spcnode: Oid,
dbnode: Oid,
lsn: Lsn,
) -> PageReconstructResult<Bytes> {
) -> Result<Bytes, PageReconstructError> {
let key = relmap_file_key(spcnode, dbnode);
let buf = try_no_ondemand_download!(self.get(key, lsn));
PageReconstructResult::Success(buf)
self.get(key, lsn).await
}
pub fn list_dbdirs(&self, lsn: Lsn) -> PageReconstructResult<HashMap<(Oid, Oid), bool>> {
pub async fn list_dbdirs(
&self,
lsn: Lsn,
) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
// fetch directory entry
let buf = try_no_ondemand_download!(self.get(DBDIR_KEY, lsn));
let buf = self.get(DBDIR_KEY, lsn).await?;
match DbDirectory::des(&buf).context("deserialization failure") {
Ok(dir) => PageReconstructResult::Success(dir.dbdirs),
Err(e) => PageReconstructResult::from(e),
Ok(dir) => Ok(dir.dbdirs),
Err(e) => Err(PageReconstructError::from(e)),
}
}
pub fn get_twophase_file(&self, xid: TransactionId, lsn: Lsn) -> PageReconstructResult<Bytes> {
pub async fn get_twophase_file(
&self,
xid: TransactionId,
lsn: Lsn,
) -> Result<Bytes, PageReconstructError> {
let key = twophase_file_key(xid);
let buf = try_no_ondemand_download!(self.get(key, lsn));
PageReconstructResult::Success(buf)
let buf = self.get(key, lsn).await?;
Ok(buf)
}
pub fn list_twophase_files(&self, lsn: Lsn) -> PageReconstructResult<HashSet<TransactionId>> {
pub async fn list_twophase_files(
&self,
lsn: Lsn,
) -> Result<HashSet<TransactionId>, PageReconstructError> {
// fetch directory entry
let buf = try_no_ondemand_download!(self.get(TWOPHASEDIR_KEY, lsn));
let buf = self.get(TWOPHASEDIR_KEY, lsn).await?;
match TwoPhaseDirectory::des(&buf).context("deserialization failure") {
Ok(dir) => PageReconstructResult::Success(dir.xids),
Err(e) => PageReconstructResult::from(e),
Ok(dir) => Ok(dir.xids),
Err(e) => Err(PageReconstructError::from(e)),
}
}
pub fn get_control_file(&self, lsn: Lsn) -> PageReconstructResult<Bytes> {
self.get(CONTROLFILE_KEY, lsn)
pub async fn get_control_file(&self, lsn: Lsn) -> Result<Bytes, PageReconstructError> {
self.get(CONTROLFILE_KEY, lsn).await
}
pub fn get_checkpoint(&self, lsn: Lsn) -> PageReconstructResult<Bytes> {
self.get(CHECKPOINT_KEY, lsn)
pub async fn get_checkpoint(&self, lsn: Lsn) -> Result<Bytes, PageReconstructError> {
self.get(CHECKPOINT_KEY, lsn).await
}
/// Does the same as get_current_logical_size but counted on demand.
@@ -460,20 +471,24 @@ impl Timeline {
cancel: CancellationToken,
) -> Result<u64, CalculateLogicalSizeError> {
// Fetch list of database dirs and iterate them
let buf = self.get_download(DBDIR_KEY, lsn).await?;
let buf = self.get(DBDIR_KEY, lsn).await.context("read dbdir")?;
let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?;
let mut total_size: u64 = 0;
for (spcnode, dbnode) in dbdir.dbdirs.keys() {
for rel in
crate::tenant::with_ondemand_download(|| self.list_rels(*spcnode, *dbnode, lsn))
.await?
for rel in self
.list_rels(*spcnode, *dbnode, lsn)
.await
.context("list rels")?
{
if cancel.is_cancelled() {
return Err(CalculateLogicalSizeError::Cancelled);
}
let relsize_key = rel_size_to_key(rel);
let mut buf = self.get_download(relsize_key, lsn).await?;
let mut buf = self
.get(relsize_key, lsn)
.await
.context("read relation size of {rel:?}")?;
let relsize = buf.get_u32_le();
total_size += relsize as u64;
@@ -494,7 +509,7 @@ impl Timeline {
result.add_key(DBDIR_KEY);
// Fetch list of database dirs and iterate them
let buf = self.get_download(DBDIR_KEY, lsn).await?;
let buf = self.get(DBDIR_KEY, lsn).await?;
let dbdir = DbDirectory::des(&buf).context("deserialization failure")?;
let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
@@ -503,15 +518,15 @@ impl Timeline {
result.add_key(relmap_file_key(spcnode, dbnode));
result.add_key(rel_dir_to_key(spcnode, dbnode));
let mut rels: Vec<RelTag> =
with_ondemand_download(|| self.list_rels(spcnode, dbnode, lsn))
.await?
.into_iter()
.collect();
let mut rels: Vec<RelTag> = self
.list_rels(spcnode, dbnode, lsn)
.await?
.into_iter()
.collect();
rels.sort_unstable();
for rel in rels {
let relsize_key = rel_size_to_key(rel);
let mut buf = self.get_download(relsize_key, lsn).await?;
let mut buf = self.get(relsize_key, lsn).await?;
let relsize = buf.get_u32_le();
result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
@@ -527,13 +542,13 @@ impl Timeline {
] {
let slrudir_key = slru_dir_to_key(kind);
result.add_key(slrudir_key);
let buf = self.get_download(slrudir_key, lsn).await?;
let buf = self.get(slrudir_key, lsn).await?;
let dir = SlruSegmentDirectory::des(&buf).context("deserialization failure")?;
let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
segments.sort_unstable();
for segno in segments {
let segsize_key = slru_segment_size_to_key(kind, segno);
let mut buf = self.get_download(segsize_key, lsn).await?;
let mut buf = self.get(segsize_key, lsn).await?;
let segsize = buf.get_u32_le();
result.add_range(
@@ -545,7 +560,7 @@ impl Timeline {
// Then pg_twophase
result.add_key(TWOPHASEDIR_KEY);
let buf = self.get_download(TWOPHASEDIR_KEY, lsn).await?;
let buf = self.get(TWOPHASEDIR_KEY, lsn).await?;
let twophase_dir = TwoPhaseDirectory::des(&buf).context("deserialization failure")?;
let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
xids.sort_unstable();
@@ -703,9 +718,14 @@ impl<'a> DatadirModification<'a> {
}
/// Store a relmapper file (pg_filenode.map) in the repository
pub fn put_relmap_file(&mut self, spcnode: Oid, dbnode: Oid, img: Bytes) -> anyhow::Result<()> {
pub async fn put_relmap_file(
&mut self,
spcnode: Oid,
dbnode: Oid,
img: Bytes,
) -> anyhow::Result<()> {
// Add it to the directory (if it doesn't exist already)
let buf = self.get(DBDIR_KEY).no_ondemand_download()?;
let buf = self.get(DBDIR_KEY).await?;
let mut dbdir = DbDirectory::des(&buf)?;
let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
@@ -731,9 +751,13 @@ impl<'a> DatadirModification<'a> {
Ok(())
}
pub fn put_twophase_file(&mut self, xid: TransactionId, img: Bytes) -> anyhow::Result<()> {
pub async fn put_twophase_file(
&mut self,
xid: TransactionId,
img: Bytes,
) -> anyhow::Result<()> {
// Add it to the directory entry
let buf = self.get(TWOPHASEDIR_KEY).no_ondemand_download()?;
let buf = self.get(TWOPHASEDIR_KEY).await?;
let mut dir = TwoPhaseDirectory::des(&buf)?;
if !dir.xids.insert(xid) {
anyhow::bail!("twophase file for xid {} already exists", xid);
@@ -757,16 +781,16 @@ impl<'a> DatadirModification<'a> {
Ok(())
}
pub fn drop_dbdir(&mut self, spcnode: Oid, dbnode: Oid) -> anyhow::Result<()> {
pub async fn drop_dbdir(&mut self, spcnode: Oid, dbnode: Oid) -> anyhow::Result<()> {
let req_lsn = self.tline.get_last_record_lsn();
let total_blocks = self
.tline
.get_db_size(spcnode, dbnode, req_lsn, true)
.no_ondemand_download()?;
.await?;
// Remove entry from dbdir
let buf = self.get(DBDIR_KEY).no_ondemand_download()?;
let buf = self.get(DBDIR_KEY).await?;
let mut dir = DbDirectory::des(&buf)?;
if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
let buf = DbDirectory::ser(&dir)?;
@@ -789,11 +813,15 @@ impl<'a> DatadirModification<'a> {
/// Create a relation fork.
///
/// 'nblocks' is the initial size.
pub fn put_rel_creation(&mut self, rel: RelTag, nblocks: BlockNumber) -> anyhow::Result<()> {
pub async fn put_rel_creation(
&mut self,
rel: RelTag,
nblocks: BlockNumber,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, "invalid relnode");
// It's possible that this is the first rel for this db in this
// tablespace. Create the reldir entry for it if so.
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY).no_ondemand_download()?)?;
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY).await?)?;
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() {
// Didn't exist. Update dbdir
@@ -805,7 +833,7 @@ impl<'a> DatadirModification<'a> {
RelDirectory::default()
} else {
// reldir already exists, fetch it
RelDirectory::des(&self.get(rel_dir_key).no_ondemand_download()?)?
RelDirectory::des(&self.get(rel_dir_key).await?)?
};
// Add the new relation to the rel directory entry, and write it back
@@ -833,17 +861,17 @@ impl<'a> DatadirModification<'a> {
}
/// Truncate relation
pub fn put_rel_truncation(&mut self, rel: RelTag, nblocks: BlockNumber) -> anyhow::Result<()> {
pub async fn put_rel_truncation(
&mut self,
rel: RelTag,
nblocks: BlockNumber,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, "invalid relnode");
let last_lsn = self.tline.get_last_record_lsn();
if self
.tline
.get_rel_exists(rel, last_lsn, true)
.no_ondemand_download()?
{
if self.tline.get_rel_exists(rel, last_lsn, true).await? {
let size_key = rel_size_to_key(rel);
// Fetch the old size first
let old_size = self.get(size_key).no_ondemand_download()?.get_u32_le();
let old_size = self.get(size_key).await?.get_u32_le();
// Update the entry with the new size.
let buf = nblocks.to_le_bytes();
@@ -863,12 +891,16 @@ impl<'a> DatadirModification<'a> {
/// Extend relation
/// If new size is smaller, do nothing.
pub fn put_rel_extend(&mut self, rel: RelTag, nblocks: BlockNumber) -> anyhow::Result<()> {
pub async fn put_rel_extend(
&mut self,
rel: RelTag,
nblocks: BlockNumber,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, "invalid relnode");
// Put size
let size_key = rel_size_to_key(rel);
let old_size = self.get(size_key).no_ondemand_download()?.get_u32_le();
let old_size = self.get(size_key).await?.get_u32_le();
// only extend relation here. never decrease the size
if nblocks > old_size {
@@ -884,12 +916,12 @@ impl<'a> DatadirModification<'a> {
}
/// Drop a relation.
pub fn put_rel_drop(&mut self, rel: RelTag) -> anyhow::Result<()> {
pub async fn put_rel_drop(&mut self, rel: RelTag) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, "invalid relnode");
// Remove it from the directory entry
let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
let buf = self.get(dir_key).no_ondemand_download()?;
let buf = self.get(dir_key).await?;
let mut dir = RelDirectory::des(&buf)?;
if dir.rels.remove(&(rel.relnode, rel.forknum)) {
@@ -900,7 +932,7 @@ impl<'a> DatadirModification<'a> {
// update logical size
let size_key = rel_size_to_key(rel);
let old_size = self.get(size_key).no_ondemand_download()?.get_u32_le();
let old_size = self.get(size_key).await?.get_u32_le();
self.pending_nblocks -= old_size as i64;
// Remove enty from relation size cache
@@ -912,7 +944,7 @@ impl<'a> DatadirModification<'a> {
Ok(())
}
pub fn put_slru_segment_creation(
pub async fn put_slru_segment_creation(
&mut self,
kind: SlruKind,
segno: u32,
@@ -920,7 +952,7 @@ impl<'a> DatadirModification<'a> {
) -> anyhow::Result<()> {
// Add it to the directory entry
let dir_key = slru_dir_to_key(kind);
let buf = self.get(dir_key).no_ondemand_download()?;
let buf = self.get(dir_key).await?;
let mut dir = SlruSegmentDirectory::des(&buf)?;
if !dir.segments.insert(segno) {
@@ -956,10 +988,10 @@ impl<'a> DatadirModification<'a> {
}
/// This method is used for marking truncated SLRU files
pub fn drop_slru_segment(&mut self, kind: SlruKind, segno: u32) -> anyhow::Result<()> {
pub async fn drop_slru_segment(&mut self, kind: SlruKind, segno: u32) -> anyhow::Result<()> {
// Remove it from the directory entry
let dir_key = slru_dir_to_key(kind);
let buf = self.get(dir_key).no_ondemand_download()?;
let buf = self.get(dir_key).await?;
let mut dir = SlruSegmentDirectory::des(&buf)?;
if !dir.segments.remove(&segno) {
@@ -983,9 +1015,9 @@ impl<'a> DatadirModification<'a> {
}
/// This method is used for marking truncated SLRU files
pub fn drop_twophase_file(&mut self, xid: TransactionId) -> anyhow::Result<()> {
pub async fn drop_twophase_file(&mut self, xid: TransactionId) -> anyhow::Result<()> {
// Remove it from the directory entry
let buf = self.get(TWOPHASEDIR_KEY).no_ondemand_download()?;
let buf = self.get(TWOPHASEDIR_KEY).await?;
let mut dir = TwoPhaseDirectory::des(&buf)?;
if !dir.xids.remove(&xid) {
@@ -1079,7 +1111,7 @@ impl<'a> DatadirModification<'a> {
// Internal helper functions to batch the modifications
fn get(&self, key: Key) -> PageReconstructResult<Bytes> {
async fn get(&self, key: Key) -> Result<Bytes, PageReconstructError> {
// Have we already updated the same key? Read the pending updated
// version in that case.
//
@@ -1087,18 +1119,20 @@ impl<'a> DatadirModification<'a> {
// value that has been removed, deletion only avoids leaking storage.
if let Some(value) = self.pending_updates.get(&key) {
if let Value::Image(img) = value {
PageReconstructResult::Success(img.clone())
Ok(img.clone())
} else {
// Currently, we never need to read back a WAL record that we
// inserted in the same "transaction". All the metadata updates
// work directly with Images, and we never need to read actual
// data pages. We could handle this if we had to, by calling
// the walredo manager, but let's keep it simple for now.
PageReconstructResult::from(anyhow::anyhow!("unexpected pending WAL record"))
Err(PageReconstructError::from(anyhow::anyhow!(
"unexpected pending WAL record"
)))
}
} else {
let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
self.tline.get(key, lsn)
self.tline.get(key, lsn).await
}
}

View File

@@ -92,7 +92,7 @@ mod timeline;
pub mod size;
pub use timeline::{with_ondemand_download, PageReconstructError, PageReconstructResult, Timeline};
pub use timeline::{PageReconstructError, Timeline};
// re-export this function so that page_cache.rs can use it.
pub use crate::tenant::ephemeral_file::writeback as writeback_ephemeral_file;
@@ -2816,15 +2816,15 @@ mod tests {
drop(writer);
assert_eq!(
tline.get(*TEST_KEY, Lsn(0x10)).no_ondemand_download()?,
tline.get(*TEST_KEY, Lsn(0x10)).await?,
TEST_IMG("foo at 0x10")
);
assert_eq!(
tline.get(*TEST_KEY, Lsn(0x1f)).no_ondemand_download()?,
tline.get(*TEST_KEY, Lsn(0x1f)).await?,
TEST_IMG("foo at 0x10")
);
assert_eq!(
tline.get(*TEST_KEY, Lsn(0x20)).no_ondemand_download()?,
tline.get(*TEST_KEY, Lsn(0x20)).await?,
TEST_IMG("foo at 0x20")
);
@@ -2903,15 +2903,15 @@ mod tests {
// Check page contents on both branches
assert_eq!(
from_utf8(&tline.get(TEST_KEY_A, Lsn(0x40)).no_ondemand_download()?)?,
from_utf8(&tline.get(TEST_KEY_A, Lsn(0x40)).await?)?,
"foo at 0x40"
);
assert_eq!(
from_utf8(&newtline.get(TEST_KEY_A, Lsn(0x40)).no_ondemand_download()?)?,
from_utf8(&newtline.get(TEST_KEY_A, Lsn(0x40)).await?)?,
"bar at 0x40"
);
assert_eq!(
from_utf8(&newtline.get(TEST_KEY_B, Lsn(0x40)).no_ondemand_download()?)?,
from_utf8(&newtline.get(TEST_KEY_B, Lsn(0x40)).await?)?,
"foobar at 0x20"
);
@@ -3070,10 +3070,7 @@ mod tests {
tenant
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO)
.await?;
assert!(newtline
.get(*TEST_KEY, Lsn(0x25))
.no_ondemand_download()
.is_ok());
assert!(newtline.get(*TEST_KEY, Lsn(0x25)).await.is_ok());
Ok(())
}
@@ -3103,7 +3100,7 @@ mod tests {
// Check that the data is still accessible on the branch.
assert_eq!(
newtline.get(*TEST_KEY, Lsn(0x50)).no_ondemand_download()?,
newtline.get(*TEST_KEY, Lsn(0x50)).await?,
TEST_IMG(&format!("foo at {}", Lsn(0x40)))
);
@@ -3251,23 +3248,23 @@ mod tests {
tline.compact().await?;
assert_eq!(
tline.get(*TEST_KEY, Lsn(0x10)).no_ondemand_download()?,
tline.get(*TEST_KEY, Lsn(0x10)).await?,
TEST_IMG("foo at 0x10")
);
assert_eq!(
tline.get(*TEST_KEY, Lsn(0x1f)).no_ondemand_download()?,
tline.get(*TEST_KEY, Lsn(0x1f)).await?,
TEST_IMG("foo at 0x10")
);
assert_eq!(
tline.get(*TEST_KEY, Lsn(0x20)).no_ondemand_download()?,
tline.get(*TEST_KEY, Lsn(0x20)).await?,
TEST_IMG("foo at 0x20")
);
assert_eq!(
tline.get(*TEST_KEY, Lsn(0x30)).no_ondemand_download()?,
tline.get(*TEST_KEY, Lsn(0x30)).await?,
TEST_IMG("foo at 0x30")
);
assert_eq!(
tline.get(*TEST_KEY, Lsn(0x40)).no_ondemand_download()?,
tline.get(*TEST_KEY, Lsn(0x40)).await?,
TEST_IMG("foo at 0x40")
);
@@ -3377,7 +3374,7 @@ mod tests {
for (blknum, last_lsn) in updated.iter().enumerate() {
test_key.field6 = blknum as u32;
assert_eq!(
tline.get(test_key, lsn).no_ondemand_download()?,
tline.get(test_key, lsn).await?,
TEST_IMG(&format!("{} at {}", blknum, last_lsn))
);
}
@@ -3463,7 +3460,7 @@ mod tests {
for (blknum, last_lsn) in updated.iter().enumerate() {
test_key.field6 = blknum as u32;
assert_eq!(
tline.get(test_key, lsn).no_ondemand_download()?,
tline.get(test_key, lsn).await?,
TEST_IMG(&format!("{} at {}", blknum, last_lsn))
);
}
@@ -3538,7 +3535,7 @@ mod tests {
println!("checking [{idx}][{blknum}] at {lsn}");
test_key.field6 = blknum as u32;
assert_eq!(
tline.get(test_key, *lsn).no_ondemand_download()?,
tline.get(test_key, *lsn).await?,
TEST_IMG(&format!("{idx} {blknum} at {lsn}"))
);
}

View File

@@ -109,7 +109,7 @@ pub trait Layer: Send + Sync {
/// See PageReconstructResult for possible return values. The collected data
/// is appended to reconstruct_data; the caller should pass an empty struct
/// on first call, or a struct with a cached older image of the page if one
/// is available. If this returns PageReconstructResult::Continue, look up
/// is available. If this returns ValueReconstructResult::Continue, look up
/// the predecessor layer and call again with the same 'reconstruct_data' to
/// collect more data.
fn get_value_reconstruct_data(

View File

@@ -335,43 +335,6 @@ pub struct WalReceiverInfo {
pub last_received_msg_ts: u128,
}
/// Like `?`, but for [`PageReconstructResult`].
/// Use it to bubble up the `NeedsDownload` and `Error` to the caller.
///
/// Once `std::ops::Try` is stabilized, we should use it instead of this macro.
#[macro_export]
macro_rules! try_no_ondemand_download {
($result:expr) => {{
let result = $result;
match result {
PageReconstructResult::Success(value) => value,
PageReconstructResult::NeedsDownload(timeline, layer) => {
return PageReconstructResult::NeedsDownload(timeline, layer);
}
PageReconstructResult::Error(e) => return PageReconstructResult::Error(e),
}
}};
}
/// Replacement for `?` in functions that return [`PageReconstructResult`].
///
/// Given an `expr: Result<T, E>`, use `try_page_reconstruct_result!(expr)`
/// instead of `(expr)?`.
/// If `expr` is `Ok(v)`, the macro evaluates to `v`.
/// If `expr` is `Err(e)`, the macro returns `PageReconstructResult::Error(e.into())`.
///
/// Once `std::ops::Try` is stabilized, we should use it instead of this macro.
#[macro_export]
macro_rules! try_page_reconstruct_result {
($result:expr) => {{
let result = $result;
match result {
Ok(v) => v,
Err(e) => return PageReconstructResult::from(e),
}
}};
}
///
/// Information about how much history needs to be retained, needed by
/// Garbage Collection.
@@ -401,15 +364,6 @@ pub struct GcInfo {
pub pitr_cutoff: Lsn,
}
pub enum PageReconstructResult<T> {
Success(T),
/// The given RemoteLayer needs to be downloaded and replaced in the timeline's layer map
/// for the operation to succeed. Use [`Timeline::download_remote_layer`] to do it, then
/// retry the operation that returned this error.
NeedsDownload(Weak<Timeline>, Weak<RemoteLayer>),
Error(PageReconstructError),
}
/// An error happened in a get() operation.
#[derive(thiserror::Error)]
pub enum PageReconstructError {
@@ -429,49 +383,6 @@ impl std::fmt::Debug for PageReconstructError {
}
}
/// This impl makes it so you can substitute return type
/// `Result<T, E>` with `PageReconstructError<T>` in functions
/// and existing `?` will generally continue to work.
/// The reason why thanks to
/// anyhow::Error that `(some error type)ensures that exis
impl<E, T> From<E> for PageReconstructResult<T>
where
E: Into<PageReconstructError>,
{
fn from(e: E) -> Self {
Self::Error(e.into())
}
}
impl<T> PageReconstructResult<T> {
/// Treat the need for on-demand download as an error.
///
/// **Avoid this function in new code** if you can help it,
/// as on-demand download will become the norm in the future,
/// especially once we implement layer file eviction.
///
/// If you are in an async function, use [`with_ondemand_download`]
/// to do the download right here.
///
/// If you are in a sync function, change its return type from
/// `Result<T, E>` to `PageReconstructResult<T>` and bubble up
/// the non-success cases of `PageReconstructResult<T>` to the caller.
/// This gives them a chance to do the download and retry.
/// Consider using [`try_no_ondemand_download`] for convenience.
///
/// For more background, read the comment on [`with_ondemand_download`].
pub fn no_ondemand_download(self) -> anyhow::Result<T> {
match self {
PageReconstructResult::Success(value) => Ok(value),
// TODO print more info about the timeline
PageReconstructResult::NeedsDownload(_, _) => anyhow::bail!("Layer needs downloading"),
PageReconstructResult::Error(e) => {
Err(anyhow::Error::new(e).context("Failed to reconstruct the page"))
}
}
}
}
/// Public interface functions
impl Timeline {
/// Get the LSN where this branch was created
@@ -493,15 +404,19 @@ impl Timeline {
/// Look up given page version.
///
/// NOTE: It is considered an error to 'get' a key that doesn't exist. The abstraction
/// above this needs to store suitable metadata to track what data exists with
/// what keys, in separate metadata entries. If a non-existent key is requested,
/// the Repository implementation may incorrectly return a value from an ancestor
/// branch, for example, or waste a lot of cycles chasing the non-existing key.
/// If a remote layer file is needed, it is downloaded as part of this
/// call.
///
pub fn get(&self, key: Key, lsn: Lsn) -> PageReconstructResult<Bytes> {
/// NOTE: It is considered an error to 'get' a key that doesn't exist. The
/// abstraction above this needs to store suitable metadata to track what
/// data exists with what keys, in separate metadata entries. If a
/// non-existent key is requested, we may incorrectly return a value from
/// an ancestor branch, for example, or waste a lot of cycles chasing the
/// non-existing key.
///
pub async fn get(&self, key: Key, lsn: Lsn) -> Result<Bytes, PageReconstructError> {
if !lsn.is_valid() {
return PageReconstructResult::from(anyhow!("Invalid LSN"));
return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
}
// Check the page cache. We will get back the most recent page with lsn <= `lsn`.
@@ -512,7 +427,7 @@ impl Timeline {
Some((cached_lsn, cached_img)) => {
match cached_lsn.cmp(&lsn) {
Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
Ordering::Equal => return PageReconstructResult::Success(cached_img), // exact LSN match, return the image
Ordering::Equal => return Ok(cached_img), // exact LSN match, return the image
Ordering::Greater => {
unreachable!("the returned lsn should never be after the requested lsn")
}
@@ -527,18 +442,14 @@ impl Timeline {
img: cached_page_img,
};
try_no_ondemand_download!(self.get_reconstruct_data(key, lsn, &mut reconstruct_state));
self.get_reconstruct_data(key, lsn, &mut reconstruct_state)
.await?;
self.metrics
.reconstruct_time_histo
.observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state))
}
// Like get(), but if a remote layer file is needed, it is downloaded as part of this call.
pub async fn get_download(&self, key: Key, lsn: Lsn) -> anyhow::Result<Bytes> {
with_ondemand_download(|| self.get(key, lsn)).await
}
/// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
pub fn get_last_record_lsn(&self) -> Lsn {
self.last_record_lsn.load().last
@@ -1630,12 +1541,12 @@ impl Timeline {
///
/// This function takes the current timeline's locked LayerMap as an argument,
/// so callers can avoid potential race conditions.
fn get_reconstruct_data(
async fn get_reconstruct_data(
&self,
key: Key,
request_lsn: Lsn,
reconstruct_state: &mut ValueReconstructState,
) -> PageReconstructResult<()> {
) -> Result<(), PageReconstructError> {
// Start from the current timeline.
let mut timeline_owned;
let mut timeline = self;
@@ -1662,34 +1573,34 @@ impl Timeline {
// The function should have updated 'state'
//info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
match result {
ValueReconstructResult::Complete => return PageReconstructResult::Success(()),
ValueReconstructResult::Complete => return Ok(()),
ValueReconstructResult::Continue => {
// If we reached an earlier cached page image, we're done.
if cont_lsn == cached_lsn + 1 {
self.metrics.materialized_page_cache_hit_counter.inc_by(1);
return PageReconstructResult::Success(());
return Ok(());
}
if prev_lsn <= cont_lsn {
// Didn't make any progress in last iteration. Error out to avoid
// getting stuck in the loop.
return layer_traversal_error(format!(
return Err(layer_traversal_error(format!(
"could not find layer with more data for key {} at LSN {}, request LSN {}, ancestor {}",
key,
Lsn(cont_lsn.0 - 1),
request_lsn,
timeline.ancestor_lsn
), traversal_path);
), traversal_path));
}
prev_lsn = cont_lsn;
}
ValueReconstructResult::Missing => {
return layer_traversal_error(
return Err(layer_traversal_error(
format!(
"could not find data for key {} at LSN {}, for request at LSN {}",
key, cont_lsn, request_lsn
),
traversal_path,
);
));
}
}
@@ -1702,7 +1613,7 @@ impl Timeline {
);
let ancestor = match timeline.get_ancestor_timeline() {
Ok(timeline) => timeline,
Err(e) => return PageReconstructResult::from(e),
Err(e) => return Err(PageReconstructError::from(e)),
};
timeline_owned = ancestor;
timeline = &*timeline_owned;
@@ -1711,7 +1622,7 @@ impl Timeline {
}
#[allow(clippy::never_loop)] // see comment at bottom of this loop
'_layer_map_search: loop {
'layer_map_search: loop {
let remote_layer = {
let layers = timeline.layers.read().unwrap();
@@ -1730,7 +1641,7 @@ impl Timeline {
reconstruct_state,
) {
Ok(result) => result,
Err(e) => return PageReconstructResult::from(e),
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
traversal_path.push((
@@ -1755,7 +1666,7 @@ impl Timeline {
reconstruct_state,
) {
Ok(result) => result,
Err(e) => return PageReconstructResult::from(e),
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
traversal_path.push((
@@ -1788,7 +1699,7 @@ impl Timeline {
reconstruct_state,
) {
Ok(result) => result,
Err(e) => return PageReconstructResult::from(e),
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
traversal_path.push((
@@ -1812,27 +1723,24 @@ impl Timeline {
continue 'outer;
}
};
// Indicate to the caller that we need remote_layer replaced with a downloaded
// layer in the layer map. The control flow could be a lot simpler, but the point
// of this commit is to prepare this function to
// 1. become async
// 2. do the download right here, using
// ```
// download_remote_layer().await?;
// continue 'layer_map_search;
// ```
// For (2), current rustc requires that the layers lock guard is not in scope.
// Hence, the complicated control flow.
// Download the remote_layer and replace it in the layer map.
// For that, we need to release the mutex. Otherwise, we'd deadlock.
//
// The control flow is so weird here because `drop(layers)` inside
// the if stmt above is not enough for current rustc: it requires
// that the layers lock guard is not in scope across the download
// await point.
let remote_layer_as_persistent: Arc<dyn PersistentLayer> =
Arc::clone(&remote_layer) as Arc<dyn PersistentLayer>;
info!(
"need remote layer {}",
remote_layer_as_persistent.traversal_id()
);
return PageReconstructResult::NeedsDownload(
Weak::clone(&timeline.myself),
Arc::downgrade(&remote_layer),
);
let id = remote_layer_as_persistent.traversal_id();
info!("need remote layer {id}");
// The next layer doesn't exist locally. Need to download it.
// (The control flow is a bit complicated here because we must drop the 'layers'
// lock before awaiting on the Future.)
info!("on-demand downloading remote layer {id}");
timeline.download_remote_layer(remote_layer).await?;
continue 'layer_map_search;
}
}
}
@@ -2270,7 +2178,7 @@ impl Timeline {
partitioning: &KeyPartitioning,
lsn: Lsn,
force: bool,
) -> anyhow::Result<HashMap<LayerFileName, LayerFileMetadata>> {
) -> Result<HashMap<LayerFileName, LayerFileMetadata>, PageReconstructError> {
let timer = self.metrics.create_images_time_histo.start_timer();
let mut image_layers: Vec<ImageLayer> = Vec::new();
for partition in partitioning.parts.iter() {
@@ -2286,13 +2194,15 @@ impl Timeline {
)?;
fail_point!("image-layer-writer-fail-before-finish", |_| {
anyhow::bail!("failpoint image-layer-writer-fail-before-finish");
Err(PageReconstructError::Other(anyhow::anyhow!(
"failpoint image-layer-writer-fail-before-finish"
)))
});
for range in &partition.ranges {
let mut key = range.start;
while key < range.end {
let img = match self.get_download(key, lsn).await {
let img = match self.get(key, lsn).await {
Ok(img) => img,
Err(err) => {
// If we fail to reconstruct a VM or FSM page, we can zero the
@@ -2343,7 +2253,7 @@ impl Timeline {
self.conf.timeline_path(&self.timeline_id, &self.tenant_id),
))
.collect::<Vec<_>>();
par_fsync::par_fsync(&all_paths)?;
par_fsync::par_fsync(&all_paths).context("fsync of newly created layer files")?;
let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len());
@@ -2351,7 +2261,10 @@ impl Timeline {
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
for l in image_layers {
let path = l.filename();
let metadata = timeline_path.join(path.file_name()).metadata()?;
let metadata = timeline_path
.join(path.file_name())
.metadata()
.context("reading metadata of layer file {path}")?;
layer_paths_to_upload.insert(path, LayerFileMetadata::new(metadata.len()));
@@ -2752,8 +2665,7 @@ impl Timeline {
if let Some(pitr_cutoff_timestamp) = now.checked_sub(pitr) {
let pitr_timestamp = to_pg_timestamp(pitr_cutoff_timestamp);
match with_ondemand_download(|| self.find_lsn_for_timestamp(pitr_timestamp)).await?
{
match self.find_lsn_for_timestamp(pitr_timestamp).await? {
LsnForTimestamp::Present(lsn) => lsn,
LsnForTimestamp::Future(lsn) => {
// The timestamp is in the future. That sounds impossible,
@@ -3022,7 +2934,7 @@ impl Timeline {
key: Key,
request_lsn: Lsn,
mut data: ValueReconstructState,
) -> PageReconstructResult<Bytes> {
) -> Result<Bytes, PageReconstructError> {
// Perform WAL redo if needed
data.records.reverse();
@@ -3034,11 +2946,11 @@ impl Timeline {
key,
img_lsn
);
PageReconstructResult::Success(img.clone())
Ok(img.clone())
} else {
PageReconstructResult::from(anyhow!(
Err(PageReconstructError::from(anyhow!(
"base image for {key} at {request_lsn} not found"
))
)))
}
} else {
// We need to do WAL redo.
@@ -3046,12 +2958,12 @@ impl Timeline {
// If we don't have a base image, then the oldest WAL record better initialize
// the page
if data.img.is_none() && !data.records.first().unwrap().1.will_init() {
PageReconstructResult::from(anyhow!(
Err(PageReconstructError::from(anyhow!(
"Base image for {} at {} not found, but got {} WAL records",
key,
request_lsn,
data.records.len()
))
)))
} else {
if data.img.is_some() {
trace!(
@@ -3072,7 +2984,7 @@ impl Timeline {
.context("Failed to reconstruct a page image:")
{
Ok(img) => img,
Err(e) => return PageReconstructResult::from(e),
Err(e) => return Err(PageReconstructError::from(e)),
};
if img.len() == page_cache::PAGE_SZ {
@@ -3087,11 +2999,11 @@ impl Timeline {
)
.context("Materialized page memoization failed")
{
return PageReconstructResult::from(e);
return Err(PageReconstructError::from(e));
}
}
PageReconstructResult::Success(img)
Ok(img)
}
}
}
@@ -3117,7 +3029,7 @@ impl Timeline {
/// So, the current download attempt will run to completion even if we stop polling.
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%remote_layer.short_id()))]
pub async fn download_remote_layer(
self: Arc<Self>,
&self,
remote_layer: Arc<RemoteLayer>,
) -> anyhow::Result<()> {
let permit = match Arc::clone(&remote_layer.ongoing_download)
@@ -3133,6 +3045,7 @@ impl Timeline {
let (sender, receiver) = tokio::sync::oneshot::channel();
// Spawn a task so that download does not outlive timeline when we detach tenant / delete timeline.
let self_clone = self.myself.upgrade().expect("timeline is gone");
task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::RemoteDownloadTask,
@@ -3141,7 +3054,7 @@ impl Timeline {
&format!("download layer {}", remote_layer.short_id()),
false,
async move {
let remote_client = self.remote_client.as_ref().unwrap();
let remote_client = self_clone.remote_client.as_ref().unwrap();
// Does retries + exponential back-off internally.
// When this fails, don't layer further retry attempts here.
@@ -3152,12 +3065,12 @@ impl Timeline {
if let Ok(size) = &result {
// XXX the temp file is still around in Err() case
// and consumes space until we clean up upon pageserver restart.
self.metrics.resident_physical_size_gauge.add(*size);
self_clone.metrics.resident_physical_size_gauge.add(*size);
// Download complete. Replace the RemoteLayer with the corresponding
// Delta- or ImageLayer in the layer map.
let new_layer = remote_layer.create_downloaded_layer(self.conf, *size);
let mut layers = self.layers.write().unwrap();
let new_layer = remote_layer.create_downloaded_layer(self_clone.conf, *size);
let mut layers = self_clone.layers.write().unwrap();
{
let l: Arc<dyn PersistentLayer> = remote_layer.clone();
layers.remove_historic(l);
@@ -3254,12 +3167,7 @@ impl Timeline {
layers
.iter_historic_layers()
.filter_map(|l| l.downcast_remote_layer())
.map({
|l| {
let self_clone = Arc::clone(self);
self_clone.download_remote_layer(l)
}
})
.map(|l| self.download_remote_layer(l))
.collect()
};
@@ -3321,101 +3229,15 @@ impl Timeline {
}
}
/// Helper function to deal with [`PageReconstructResult`].
///
/// Takes a sync closure that returns a [`PageReconstructResult`].
/// If it is [`PageReconstructResult::NeedsDownload`],
/// do the download and retry the closure.
///
/// ### Background
///
/// This is a crutch to make on-demand downloads efficient in
/// our async-sync-async sandwich codebase. Some context:
///
/// - The code that does the downloads uses async Rust.
/// - The code that initiates download is many levels of sync Rust.
/// - The sync code must wait for the download to finish to
/// make further progress.
/// - The sync code is invoked directly from async functions upstack.
///
/// Example (there are also much worse ones where the sandwich is taller)
///
/// async handle_get_page_at_lsn_request page_service.rs
/// sync get_rel_page_at_lsn timeline.rs
/// sync timeline.get timeline.rs
/// sync get_reconstruct_data timeline.rs
/// async download_remote_layer timeline.rs
///
/// It is not possible to Timeline::download_remote_layer().await within
/// get_reconstruct_data, so instead, we return [`PageReconstructResult::NeedsDownload`]
/// which contains references to the [`Timeline`] and [`RemoteLayer`].
/// We bubble that error upstack to the async code, which can then call
/// `Timeline::download_remote_layer().await`.
/// That is _efficient_ because tokio can use the same OS thread to do
/// other work while we're waiting for the download.
///
/// It is a deliberate decision to use a new result type to communicate
/// the need for download instead of adding another variant to [`PageReconstructError`].
/// The reason is that with the latter approach, any place that does
/// `?` on a `Result<T, PageReconstructError>` will implicitly ignore the
/// need for download. We want that to be explicit, so that
/// - the code base becomes greppable for places that don't do a download
/// - future code changes will need to explicilty address for on-demand download
///
/// Alternatives to consider in the future:
///
/// - Inside `get_reconstruct_data`, we can std::thread::spawn a thread
/// and use it to block_on the download_remote_layer future.
/// That is obviously inefficient as it creates one thread per download.
/// - Convert everything to async. The problem here is that the sync
/// functions are used by many other sync functions. So, the scope
/// creep of such a conversion is tremendous.
/// - Compromise between the two: implement async functions for each sync
/// function. Switch over the hot code paths (GetPage()) to use the
/// async path, so that the hot path doesn't spawn threads. Other code
/// paths would remain sync initially, and get converted to async over time.
///
pub async fn with_ondemand_download<F, T>(mut f: F) -> Result<T, anyhow::Error>
where
F: Send + FnMut() -> PageReconstructResult<T>,
T: Send,
{
loop {
let closure_result = f();
match closure_result {
PageReconstructResult::NeedsDownload(weak_timeline, weak_remote_layer) => {
// if the timeline is gone, it has likely been deleted / tenant detached
let tl = weak_timeline.upgrade().context("timeline is gone")?;
// if the remote layer got removed, retry the function, it might succeed now
let remote_layer = match weak_remote_layer.upgrade() {
None => {
info!("remote layer is gone, retrying closure");
continue;
}
Some(l) => l,
};
// Does retries internally
tl.download_remote_layer(remote_layer).await?;
// Download successful, retry the closure
continue;
}
PageReconstructResult::Success(closure_value) => return Ok(closure_value),
PageReconstructResult::Error(e) => {
return Err(anyhow::Error::new(e).context("Failed to reconstruct the page"))
}
}
}
}
type TraversalPathItem = (
ValueReconstructResult,
Lsn,
Box<dyn FnOnce() -> TraversalId>,
Box<dyn Send + FnOnce() -> TraversalId>,
);
/// Helper function for get_reconstruct_data() to add the path of layers traversed
/// to an error, as anyhow context information.
fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageReconstructResult<()> {
fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageReconstructError {
// We want the original 'msg' to be the outermost context. The outermost context
// is the most high-level information, which also gets propagated to the client.
let mut msg_iter = path
@@ -3434,7 +3256,7 @@ fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageRecon
// Append all subsequent traversals, and the error message 'msg', as contexts.
let msg = msg_iter.fold(err, |err, msg| err.context(msg));
PageReconstructResult::from(msg)
PageReconstructError::from(msg)
}
/// Various functions to mutate the timeline.

View File

@@ -30,8 +30,8 @@ use bytes::{Buf, Bytes, BytesMut};
use tracing::*;
use crate::pgdatadir_mapping::*;
use crate::tenant::PageReconstructError;
use crate::tenant::Timeline;
use crate::tenant::{with_ondemand_download, PageReconstructError};
use crate::walrecord::*;
use crate::ZERO_PAGE;
use pageserver_api::reltag::{RelTag, SlruKind};
@@ -55,8 +55,7 @@ impl<'a> WalIngest<'a> {
pub async fn new(timeline: &Timeline, startpoint: Lsn) -> anyhow::Result<WalIngest> {
// Fetch the latest checkpoint into memory, so that we can compare with it
// quickly in `ingest_record` and update it when it changes.
let checkpoint_bytes =
with_ondemand_download(|| timeline.get_checkpoint(startpoint)).await?;
let checkpoint_bytes = timeline.get_checkpoint(startpoint).await?;
let checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
@@ -107,7 +106,7 @@ impl<'a> WalIngest<'a> {
== pg_constants::XLOG_SMGR_CREATE
{
let create = XlSmgrCreate::decode(&mut buf);
self.ingest_xlog_smgr_create(modification, &create)?;
self.ingest_xlog_smgr_create(modification, &create).await?;
} else if decoded.xl_rmid == pg_constants::RM_SMGR_ID
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_SMGR_TRUNCATE
@@ -135,7 +134,7 @@ impl<'a> WalIngest<'a> {
let dropdb = XlDropDatabase::decode(&mut buf);
for tablespace_id in dropdb.tablespace_ids {
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
modification.drop_dbdir(tablespace_id, dropdb.db_id)?;
modification.drop_dbdir(tablespace_id, dropdb.db_id).await?;
}
}
} else if self.timeline.pg_version == 15 {
@@ -159,7 +158,7 @@ impl<'a> WalIngest<'a> {
let dropdb = XlDropDatabase::decode(&mut buf);
for tablespace_id in dropdb.tablespace_ids {
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
modification.drop_dbdir(tablespace_id, dropdb.db_id)?;
modification.drop_dbdir(tablespace_id, dropdb.db_id).await?;
}
}
}
@@ -214,9 +213,11 @@ impl<'a> WalIngest<'a> {
parsed_xact.xid,
lsn,
);
modification.drop_twophase_file(parsed_xact.xid)?;
modification.drop_twophase_file(parsed_xact.xid).await?;
} else if info == pg_constants::XLOG_XACT_PREPARE {
modification.put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]))?;
modification
.put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]))
.await?;
}
} else if decoded.xl_rmid == pg_constants::RM_MULTIXACT_ID {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
@@ -250,11 +251,13 @@ impl<'a> WalIngest<'a> {
self.ingest_multixact_create_record(modification, &xlrec)?;
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
let xlrec = XlMultiXactTruncate::decode(&mut buf);
self.ingest_multixact_truncate_record(modification, &xlrec)?;
self.ingest_multixact_truncate_record(modification, &xlrec)
.await?;
}
} else if decoded.xl_rmid == pg_constants::RM_RELMAP_ID {
let xlrec = XlRelmapUpdate::decode(&mut buf);
self.ingest_relmap_page(modification, &xlrec, decoded)?;
self.ingest_relmap_page(modification, &xlrec, decoded)
.await?;
} else if decoded.xl_rmid == pg_constants::RM_XLOG_ID {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_NEXTOID {
@@ -534,23 +537,21 @@ impl<'a> WalIngest<'a> {
// get calls instead.
let req_lsn = modification.tline.get_last_record_lsn();
let rels = with_ondemand_download(|| {
modification
.tline
.list_rels(src_tablespace_id, src_db_id, req_lsn)
})
.await?;
let rels = modification
.tline
.list_rels(src_tablespace_id, src_db_id, req_lsn)
.await?;
debug!("ingest_xlog_dbase_create: {} rels", rels.len());
// Copy relfilemap
let filemap = with_ondemand_download(|| {
modification
.tline
.get_relmap_file(src_tablespace_id, src_db_id, req_lsn)
})
.await?;
modification.put_relmap_file(tablespace_id, db_id, filemap)?;
let filemap = modification
.tline
.get_relmap_file(src_tablespace_id, src_db_id, req_lsn)
.await?;
modification
.put_relmap_file(tablespace_id, db_id, filemap)
.await?;
let mut num_rels_copied = 0;
let mut num_blocks_copied = 0;
@@ -558,9 +559,10 @@ impl<'a> WalIngest<'a> {
assert_eq!(src_rel.spcnode, src_tablespace_id);
assert_eq!(src_rel.dbnode, src_db_id);
let nblocks =
with_ondemand_download(|| modification.tline.get_rel_size(src_rel, req_lsn, true))
.await?;
let nblocks = modification
.tline
.get_rel_size(src_rel, req_lsn, true)
.await?;
let dst_rel = RelTag {
spcnode: tablespace_id,
dbnode: db_id,
@@ -568,19 +570,17 @@ impl<'a> WalIngest<'a> {
forknum: src_rel.forknum,
};
modification.put_rel_creation(dst_rel, nblocks)?;
modification.put_rel_creation(dst_rel, nblocks).await?;
// Copy content
debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
for blknum in 0..nblocks {
debug!("copying block {} from {} to {}", blknum, src_rel, dst_rel);
let content = with_ondemand_download(|| {
modification
.tline
.get_rel_page_at_lsn(src_rel, blknum, req_lsn, true)
})
.await?;
let content = modification
.tline
.get_rel_page_at_lsn(src_rel, blknum, req_lsn, true)
.await?;
modification.put_rel_page_image(dst_rel, blknum, content)?;
num_blocks_copied += 1;
}
@@ -595,9 +595,9 @@ impl<'a> WalIngest<'a> {
Ok(())
}
fn ingest_xlog_smgr_create(
async fn ingest_xlog_smgr_create(
&mut self,
modification: &mut DatadirModification,
modification: &mut DatadirModification<'_>,
rec: &XlSmgrCreate,
) -> anyhow::Result<()> {
let rel = RelTag {
@@ -606,7 +606,7 @@ impl<'a> WalIngest<'a> {
relnode: rec.rnode.relnode,
forknum: rec.forknum,
};
self.put_rel_creation(modification, rel)?;
self.put_rel_creation(modification, rel).await?;
Ok(())
}
@@ -629,7 +629,8 @@ impl<'a> WalIngest<'a> {
relnode,
forknum: MAIN_FORKNUM,
};
self.put_rel_truncation(modification, rel, rec.blkno)?;
self.put_rel_truncation(modification, rel, rec.blkno)
.await?;
}
if (rec.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 {
let rel = RelTag {
@@ -650,7 +651,8 @@ impl<'a> WalIngest<'a> {
let nblocks = self.get_relsize(rel, modification.lsn).await?;
if nblocks > fsm_physical_page_no {
// check if something to do: FSM is larger than truncate position
self.put_rel_truncation(modification, rel, fsm_physical_page_no)?;
self.put_rel_truncation(modification, rel, fsm_physical_page_no)
.await?;
}
}
if (rec.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 {
@@ -671,7 +673,8 @@ impl<'a> WalIngest<'a> {
let nblocks = self.get_relsize(rel, modification.lsn).await?;
if nblocks > vm_page_no {
// check if something to do: VM is larger than truncate position
self.put_rel_truncation(modification, rel, vm_page_no)?;
self.put_rel_truncation(modification, rel, vm_page_no)
.await?;
}
}
Ok(())
@@ -740,10 +743,12 @@ impl<'a> WalIngest<'a> {
relnode: xnode.relnode,
};
let last_lsn = self.timeline.get_last_record_lsn();
if with_ondemand_download(|| modification.tline.get_rel_exists(rel, last_lsn, true))
if modification
.tline
.get_rel_exists(rel, last_lsn, true)
.await?
{
self.put_rel_drop(modification, rel)?;
self.put_rel_drop(modification, rel).await?;
}
}
}
@@ -795,16 +800,16 @@ impl<'a> WalIngest<'a> {
// instead.
let req_lsn = modification.tline.get_last_record_lsn();
let slru_segments = with_ondemand_download(|| {
modification
.tline
.list_slru_segments(SlruKind::Clog, req_lsn)
})
.await?;
let slru_segments = modification
.tline
.list_slru_segments(SlruKind::Clog, req_lsn)
.await?;
for segno in slru_segments {
let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
if slru_may_delete_clogsegment(segpage, xlrec.pageno) {
modification.drop_slru_segment(SlruKind::Clog, segno)?;
modification
.drop_slru_segment(SlruKind::Clog, segno)
.await?;
trace!("Drop CLOG segment {:>04X}", segno);
}
}
@@ -891,9 +896,9 @@ impl<'a> WalIngest<'a> {
Ok(())
}
fn ingest_multixact_truncate_record(
async fn ingest_multixact_truncate_record(
&mut self,
modification: &mut DatadirModification,
modification: &mut DatadirModification<'_>,
xlrec: &XlMultiXactTruncate,
) -> Result<()> {
self.checkpoint.oldestMulti = xlrec.end_trunc_off;
@@ -909,7 +914,9 @@ impl<'a> WalIngest<'a> {
// Delete all the segments except the last one. The last segment can still
// contain, possibly partially, valid data.
while segment != endsegment {
modification.drop_slru_segment(SlruKind::MultiXactMembers, segment as u32)?;
modification
.drop_slru_segment(SlruKind::MultiXactMembers, segment as u32)
.await?;
/* move to next segment, handling wraparound correctly */
if segment == maxsegment {
@@ -925,9 +932,9 @@ impl<'a> WalIngest<'a> {
Ok(())
}
fn ingest_relmap_page(
async fn ingest_relmap_page(
&mut self,
modification: &mut DatadirModification,
modification: &mut DatadirModification<'_>,
xlrec: &XlRelmapUpdate,
decoded: &DecodedWALRecord,
) -> Result<()> {
@@ -936,17 +943,19 @@ impl<'a> WalIngest<'a> {
// skip xl_relmap_update
buf.advance(12);
modification.put_relmap_file(xlrec.tsid, xlrec.dbid, Bytes::copy_from_slice(&buf[..]))?;
modification
.put_relmap_file(xlrec.tsid, xlrec.dbid, Bytes::copy_from_slice(&buf[..]))
.await?;
Ok(())
}
fn put_rel_creation(
async fn put_rel_creation(
&mut self,
modification: &mut DatadirModification,
modification: &mut DatadirModification<'_>,
rel: RelTag,
) -> Result<()> {
modification.put_rel_creation(rel, 0)?;
modification.put_rel_creation(rel, 0).await?;
Ok(())
}
@@ -974,28 +983,31 @@ impl<'a> WalIngest<'a> {
Ok(())
}
fn put_rel_truncation(
async fn put_rel_truncation(
&mut self,
modification: &mut DatadirModification,
modification: &mut DatadirModification<'_>,
rel: RelTag,
nblocks: BlockNumber,
) -> anyhow::Result<()> {
modification.put_rel_truncation(rel, nblocks)?;
modification.put_rel_truncation(rel, nblocks).await?;
Ok(())
}
fn put_rel_drop(&mut self, modification: &mut DatadirModification, rel: RelTag) -> Result<()> {
modification.put_rel_drop(rel)?;
async fn put_rel_drop(
&mut self,
modification: &mut DatadirModification<'_>,
rel: RelTag,
) -> Result<()> {
modification.put_rel_drop(rel).await?;
Ok(())
}
async fn get_relsize(&mut self, rel: RelTag, lsn: Lsn) -> anyhow::Result<BlockNumber> {
let exists =
with_ondemand_download(|| self.timeline.get_rel_exists(rel, lsn, true)).await?;
let exists = self.timeline.get_rel_exists(rel, lsn, true).await?;
let nblocks = if !exists {
0
} else {
with_ondemand_download(|| self.timeline.get_rel_size(rel, lsn, true)).await?
self.timeline.get_rel_size(rel, lsn, true).await?
};
Ok(nblocks)
}
@@ -1011,19 +1023,17 @@ impl<'a> WalIngest<'a> {
// record.
// TODO: would be nice if to be more explicit about it
let last_lsn = modification.lsn;
let old_nblocks =
if !with_ondemand_download(|| self.timeline.get_rel_exists(rel, last_lsn, true)).await?
{
// create it with 0 size initially, the logic below will extend it
modification.put_rel_creation(rel, 0)?;
0
} else {
with_ondemand_download(|| self.timeline.get_rel_size(rel, last_lsn, true)).await?
};
let old_nblocks = if !self.timeline.get_rel_exists(rel, last_lsn, true).await? {
// create it with 0 size initially, the logic below will extend it
modification.put_rel_creation(rel, 0).await?;
0
} else {
self.timeline.get_rel_size(rel, last_lsn, true).await?
};
if new_nblocks > old_nblocks {
//info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
modification.put_rel_extend(rel, new_nblocks)?;
modification.put_rel_extend(rel, new_nblocks).await?;
// fill the gap with zeros
for gap_blknum in old_nblocks..blknum {
@@ -1063,16 +1073,19 @@ impl<'a> WalIngest<'a> {
// record.
// TODO: would be nice if to be more explicit about it
let last_lsn = self.timeline.get_last_record_lsn();
let old_nblocks = if !with_ondemand_download(|| {
self.timeline.get_slru_segment_exists(kind, segno, last_lsn)
})
.await?
let old_nblocks = if !self
.timeline
.get_slru_segment_exists(kind, segno, last_lsn)
.await?
{
// create it with 0 size initially, the logic below will extend it
modification.put_slru_segment_creation(kind, segno, 0)?;
modification
.put_slru_segment_creation(kind, segno, 0)
.await?;
0
} else {
with_ondemand_download(|| self.timeline.get_slru_segment_size(kind, segno, last_lsn))
self.timeline
.get_slru_segment_size(kind, segno, last_lsn)
.await?
};
@@ -1124,7 +1137,7 @@ mod tests {
async fn init_walingest_test(tline: &Timeline) -> Result<WalIngest> {
let mut m = tline.begin_modification(Lsn(0x10));
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
m.put_relmap_file(0, 111, Bytes::from(""))?; // dummy relmapper file
m.put_relmap_file(0, 111, Bytes::from("")).await?; // dummy relmapper file
m.commit()?;
let walingest = WalIngest::new(tline, Lsn(0x10)).await?;
@@ -1138,7 +1151,7 @@ mod tests {
let mut walingest = init_walingest_test(&tline).await?;
let mut m = tline.begin_modification(Lsn(0x20));
walingest.put_rel_creation(&mut m, TESTREL_A)?;
walingest.put_rel_creation(&mut m, TESTREL_A).await?;
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))
.await?;
@@ -1163,132 +1176,103 @@ mod tests {
// The relation was created at LSN 2, not visible at LSN 1 yet.
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Lsn(0x10), false)
.no_ondemand_download()?,
tline.get_rel_exists(TESTREL_A, Lsn(0x10), false).await?,
false
);
assert!(tline
.get_rel_size(TESTREL_A, Lsn(0x10), false)
.no_ondemand_download()
.await
.is_err());
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Lsn(0x20), false)
.no_ondemand_download()?,
tline.get_rel_exists(TESTREL_A, Lsn(0x20), false).await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x20), false)
.no_ondemand_download()?,
1
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x50), false)
.no_ondemand_download()?,
3
);
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x20), false).await?, 1);
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x50), false).await?, 3);
// Check page contents at each LSN
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x20), false)
.no_ondemand_download()?,
.await?,
TEST_IMG("foo blk 0 at 2")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x30), false)
.no_ondemand_download()?,
.await?,
TEST_IMG("foo blk 0 at 3")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x40), false)
.no_ondemand_download()?,
.await?,
TEST_IMG("foo blk 0 at 3")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)
.no_ondemand_download()?,
.await?,
TEST_IMG("foo blk 1 at 4")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x50), false)
.no_ondemand_download()?,
.await?,
TEST_IMG("foo blk 0 at 3")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)
.no_ondemand_download()?,
.await?,
TEST_IMG("foo blk 1 at 4")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 2, Lsn(0x50), false)
.no_ondemand_download()?,
.await?,
TEST_IMG("foo blk 2 at 5")
);
// Truncate last block
let mut m = tline.begin_modification(Lsn(0x60));
walingest.put_rel_truncation(&mut m, TESTREL_A, 2)?;
walingest.put_rel_truncation(&mut m, TESTREL_A, 2).await?;
m.commit()?;
assert_current_logical_size(&tline, Lsn(0x60));
// Check reported size and contents after truncation
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x60), false)
.no_ondemand_download()?,
2
);
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x60), false).await?, 2);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x60), false)
.no_ondemand_download()?,
.await?,
TEST_IMG("foo blk 0 at 3")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)
.no_ondemand_download()?,
.await?,
TEST_IMG("foo blk 1 at 4")
);
// should still see the truncated block with older LSN
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x50), false)
.no_ondemand_download()?,
3
);
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x50), false).await?, 3);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 2, Lsn(0x50), false)
.no_ondemand_download()?,
.await?,
TEST_IMG("foo blk 2 at 5")
);
// Truncate to zero length
let mut m = tline.begin_modification(Lsn(0x68));
walingest.put_rel_truncation(&mut m, TESTREL_A, 0)?;
walingest.put_rel_truncation(&mut m, TESTREL_A, 0).await?;
m.commit()?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x68), false)
.no_ondemand_download()?,
0
);
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x68), false).await?, 0);
// Extend from 0 to 2 blocks, leaving a gap
let mut m = tline.begin_modification(Lsn(0x70));
@@ -1296,22 +1280,17 @@ mod tests {
.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"))
.await?;
m.commit()?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x70), false)
.no_ondemand_download()?,
2
);
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x70), false).await?, 2);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x70), false)
.no_ondemand_download()?,
.await?,
ZERO_PAGE
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x70), false)
.no_ondemand_download()?,
.await?,
TEST_IMG("foo blk 1")
);
@@ -1321,24 +1300,19 @@ mod tests {
.put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"))
.await?;
m.commit()?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x80), false)
.no_ondemand_download()?,
1501
);
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x80), false).await?, 1501);
for blk in 2..1500 {
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blk, Lsn(0x80), false)
.no_ondemand_download()?,
.await?,
ZERO_PAGE
);
}
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1500, Lsn(0x80), false)
.no_ondemand_download()?,
.await?,
TEST_IMG("foo blk 1500")
);
@@ -1361,28 +1335,19 @@ mod tests {
// Check that rel exists and size is correct
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Lsn(0x20), false)
.no_ondemand_download()?,
tline.get_rel_exists(TESTREL_A, Lsn(0x20), false).await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x20), false)
.no_ondemand_download()?,
1
);
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x20), false).await?, 1);
// Drop rel
let mut m = tline.begin_modification(Lsn(0x30));
walingest.put_rel_drop(&mut m, TESTREL_A)?;
walingest.put_rel_drop(&mut m, TESTREL_A).await?;
m.commit()?;
// Check that rel is not visible anymore
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Lsn(0x30), false)
.no_ondemand_download()?,
tline.get_rel_exists(TESTREL_A, Lsn(0x30), false).await?,
false
);
@@ -1398,17 +1363,10 @@ mod tests {
// Check that rel exists and size is correct
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Lsn(0x40), false)
.no_ondemand_download()?,
tline.get_rel_exists(TESTREL_A, Lsn(0x40), false).await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x40), false)
.no_ondemand_download()?,
1
);
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x40), false).await?, 1);
Ok(())
}
@@ -1435,26 +1393,20 @@ mod tests {
// The relation was created at LSN 20, not visible at LSN 1 yet.
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Lsn(0x10), false)
.no_ondemand_download()?,
tline.get_rel_exists(TESTREL_A, Lsn(0x10), false).await?,
false
);
assert!(tline
.get_rel_size(TESTREL_A, Lsn(0x10), false)
.no_ondemand_download()
.await
.is_err());
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Lsn(0x20), false)
.no_ondemand_download()?,
tline.get_rel_exists(TESTREL_A, Lsn(0x20), false).await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x20), false)
.no_ondemand_download()?,
tline.get_rel_size(TESTREL_A, Lsn(0x20), false).await?,
relsize
);
@@ -1465,7 +1417,7 @@ mod tests {
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, lsn, false)
.no_ondemand_download()?,
.await?,
TEST_IMG(&data)
);
}
@@ -1473,16 +1425,11 @@ mod tests {
// Truncate relation so that second segment was dropped
// - only leave one page
let mut m = tline.begin_modification(Lsn(0x60));
walingest.put_rel_truncation(&mut m, TESTREL_A, 1)?;
walingest.put_rel_truncation(&mut m, TESTREL_A, 1).await?;
m.commit()?;
// Check reported size and contents after truncation
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x60), false)
.no_ondemand_download()?,
1
);
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x60), false).await?, 1);
for blkno in 0..1 {
let lsn = Lsn(0x20);
@@ -1490,16 +1437,14 @@ mod tests {
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x60), false)
.no_ondemand_download()?,
.await?,
TEST_IMG(&data)
);
}
// should still see all blocks with older LSN
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x50), false)
.no_ondemand_download()?,
tline.get_rel_size(TESTREL_A, Lsn(0x50), false).await?,
relsize
);
for blkno in 0..relsize {
@@ -1508,7 +1453,7 @@ mod tests {
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x50), false)
.no_ondemand_download()?,
.await?,
TEST_IMG(&data)
);
}
@@ -1526,15 +1471,11 @@ mod tests {
m.commit()?;
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Lsn(0x80), false)
.no_ondemand_download()?,
tline.get_rel_exists(TESTREL_A, Lsn(0x80), false).await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x80), false)
.no_ondemand_download()?,
tline.get_rel_size(TESTREL_A, Lsn(0x80), false).await?,
relsize
);
// Check relation content
@@ -1544,7 +1485,7 @@ mod tests {
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x80), false)
.no_ondemand_download()?,
.await?,
TEST_IMG(&data)
);
}
@@ -1574,21 +1515,19 @@ mod tests {
assert_current_logical_size(&tline, Lsn(lsn));
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(lsn), false)
.no_ondemand_download()?,
tline.get_rel_size(TESTREL_A, Lsn(lsn), false).await?,
RELSEG_SIZE + 1
);
// Truncate one block
lsn += 0x10;
let mut m = tline.begin_modification(Lsn(lsn));
walingest.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE)?;
walingest
.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE)
.await?;
m.commit()?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(lsn), false)
.no_ondemand_download()?,
tline.get_rel_size(TESTREL_A, Lsn(lsn), false).await?,
RELSEG_SIZE
);
assert_current_logical_size(&tline, Lsn(lsn));
@@ -1596,12 +1535,12 @@ mod tests {
// Truncate another block
lsn += 0x10;
let mut m = tline.begin_modification(Lsn(lsn));
walingest.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1)?;
walingest
.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1)
.await?;
m.commit()?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(lsn), false)
.no_ondemand_download()?,
tline.get_rel_size(TESTREL_A, Lsn(lsn), false).await?,
RELSEG_SIZE - 1
);
assert_current_logical_size(&tline, Lsn(lsn));
@@ -1612,12 +1551,12 @@ mod tests {
while size >= 0 {
lsn += 0x10;
let mut m = tline.begin_modification(Lsn(lsn));
walingest.put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber)?;
walingest
.put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber)
.await?;
m.commit()?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(lsn), false)
.no_ondemand_download()?,
tline.get_rel_size(TESTREL_A, Lsn(lsn), false).await?,
size as BlockNumber
);

View File

@@ -15,7 +15,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
env.pageserver.allowed_errors.extend(
[
".*Failed to reconstruct the page.*",
".*Failed to load delta layer.*",
".*could not find data for key.*",
".*is not active. Current state: Broken.*",
".*will not become active. Current state: Broken.*",
@@ -99,7 +99,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
# Third timeline will also fail during basebackup, because the layer file is corrupt.
# It will fail when we try to read (and reconstruct) a page from it, ergo the error message.
# (We don't check layer file contents on startup, when loading the timeline)
with pytest.raises(Exception, match="Failed to reconstruct the page") as err:
with pytest.raises(Exception, match="Failed to load delta layer") as err:
pg3.start()
log.info(
f"As expected, compute startup failed for timeline {tenant3}/{timeline3} with corrupt layers: {err}"