Compare commits

...

4 Commits

Author SHA1 Message Date
Konstantin Knizhnik
d9788d7cbd Check relation size for gertpage only in case of error 2025-05-10 09:06:37 +03:00
Konstantin Knizhnik
374495c041 Check if page is not out of relation size in getpage for VM/FSM forks 2025-05-09 18:15:18 +03:00
Konstantin Knizhnik
ca4d758504 Make clippy happy 2025-05-09 17:17:43 +03:00
Konstantin Knizhnik
d5a7ecade5 Do not check relsize cache in getpage and do not update it in get_rel_size 2025-05-09 16:53:44 +03:00
3 changed files with 100 additions and 101 deletions

View File

@@ -561,6 +561,21 @@ pub fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
} }
} }
#[inline(always)]
pub fn key_to_rel_tag(key: Key) -> RelTag {
RelTag {
spcnode: key.field2,
dbnode: key.field3,
relnode: key.field4,
forknum: key.field5,
}
}
#[inline(always)]
pub fn key_to_blknum(key: Key) -> BlockNumber {
key.field6
}
#[inline(always)] #[inline(always)]
pub fn rel_size_to_key(rel: RelTag) -> Key { pub fn rel_size_to_key(rel: RelTag) -> Key {
Key { Key {

View File

@@ -16,9 +16,9 @@ use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum; use enum_map::Enum;
use pageserver_api::key::{ use pageserver_api::key::{
AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key, RelDirExists, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key, RelDirExists,
TWOPHASEDIR_KEY, dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, TWOPHASEDIR_KEY, dbdir_key_range, key_to_blknum, key_to_rel_tag, rel_block_to_key,
rel_size_to_key, rel_tag_sparse_key, rel_tag_sparse_key_range, relmap_file_key, rel_dir_to_key, rel_key_range, rel_size_to_key, rel_tag_sparse_key, rel_tag_sparse_key_range,
repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key, relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range, slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
}; };
use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace}; use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace};
@@ -259,7 +259,7 @@ impl Timeline {
let mut result = Vec::with_capacity(pages.len()); let mut result = Vec::with_capacity(pages.len());
let result_slots = result.spare_capacity_mut(); let result_slots = result.spare_capacity_mut();
let mut keys_slots: HashMap<Key, smallvec::SmallVec<[(usize, RequestContext); 1]>> = let mut keys_slots: HashMap<Key, smallvec::SmallVec<[(usize, Lsn, RequestContext); 1]>> =
HashMap::with_capacity(pages.len()); HashMap::with_capacity(pages.len());
let mut req_keyspaces: HashMap<Lsn, KeySpaceRandomAccum> = let mut req_keyspaces: HashMap<Lsn, KeySpaceRandomAccum> =
@@ -275,43 +275,6 @@ impl Timeline {
continue; continue;
} }
let nblocks = {
let ctx = RequestContextBuilder::from(&ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"GET_REL_SIZE",
reltag=%tag,
lsn=%lsn,
)
})
.attached_child();
match self
.get_rel_size(*tag, Version::Lsn(lsn), &ctx)
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
.await
{
Ok(nblocks) => nblocks,
Err(err) => {
result_slots[response_slot_idx].write(Err(err));
slots_filled += 1;
continue;
}
}
};
if *blknum >= nblocks {
debug!(
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
tag, blknum, lsn, nblocks
);
result_slots[response_slot_idx].write(Ok(ZERO_PAGE.clone()));
slots_filled += 1;
continue;
}
let key = rel_block_to_key(*tag, *blknum); let key = rel_block_to_key(*tag, *blknum);
let ctx = RequestContextBuilder::from(&ctx) let ctx = RequestContextBuilder::from(&ctx)
@@ -326,7 +289,7 @@ impl Timeline {
.attached_child(); .attached_child();
let key_slots = keys_slots.entry(key).or_default(); let key_slots = keys_slots.entry(key).or_default();
key_slots.push((response_slot_idx, ctx)); key_slots.push((response_slot_idx, lsn, ctx));
let acc = req_keyspaces.entry(lsn).or_default(); let acc = req_keyspaces.entry(lsn).or_default();
acc.add_key(key); acc.add_key(key);
@@ -347,42 +310,95 @@ impl Timeline {
Ok(results) => { Ok(results) => {
for (key, res) in results { for (key, res) in results {
let mut key_slots = keys_slots.remove(&key).unwrap().into_iter(); let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
let (first_slot, first_req_ctx) = key_slots.next().unwrap();
for (slot, req_ctx) in key_slots { // Try to check if error is caused by access beyond end of relation
let clone = match &res { match &res {
Ok(buf) => Ok(buf.clone()), Err(err) => {
Err(err) => Err(match err { let tag = key_to_rel_tag(key);
PageReconstructError::Cancelled => PageReconstructError::Cancelled, let blknum = key_to_blknum(key);
let mut first_error_slot: Option<usize> = None;
for (slot, lsn, req_ctx) in key_slots {
// Check relation size only in case of error
let relsize_ctx = RequestContextBuilder::from(&ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"GET_REL_SIZE",
reltag=%tag,
lsn=%lsn,
)
})
.attached_child();
x @ PageReconstructError::Other(_) if let Ok(nblocks) = self
| x @ PageReconstructError::AncestorLsnTimeout(_) .get_rel_size(tag, Version::Lsn(lsn), &relsize_ctx)
| x @ PageReconstructError::WalRedo(_) .maybe_perf_instrument(&ctx, |crnt_perf_span| {
| x @ PageReconstructError::MissingKey(_) => { crnt_perf_span.clone()
PageReconstructError::Other(anyhow::anyhow!( })
"there was more than one request for this key in the batch, error logged once: {x:?}" .await
)) {
if blknum >= nblocks {
debug!(
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
tag, blknum, lsn, nblocks
);
result_slots[slot].write(Ok(ZERO_PAGE.clone()));
slots_filled += 1;
continue;
}
} }
}), if first_error_slot.is_none() {
}; first_error_slot = Some(slot);
} else {
let err = match err {
PageReconstructError::Cancelled => {
PageReconstructError::Cancelled
}
result_slots[slot].write(clone); x @ PageReconstructError::Other(_)
// There is no standardized way to express that the batched span followed from N request spans. | x @ PageReconstructError::AncestorLsnTimeout(_)
// So, abuse the system and mark the request contexts as follows_from the batch span, so we get | x @ PageReconstructError::WalRedo(_)
// some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for. | x @ PageReconstructError::MissingKey(_) => {
req_ctx.perf_follows_from(ctx); PageReconstructError::Other(anyhow::anyhow!(
slots_filled += 1; "there was more than one request for this key in the batch, error logged once: {x:?}"
))
}
};
result_slots[slot].write(Err(err));
};
// There is no standardized way to express that the batched span followed from N request spans.
// So, abuse the system and mark the request contexts as follows_from the batch span, so we get
// some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
req_ctx.perf_follows_from(ctx);
slots_filled += 1;
}
if let Some(slot) = first_error_slot {
result_slots[slot].write(res);
}
}
Ok(buf) => {
let (first_slot, _first_lsn, first_req_ctx) = key_slots.next().unwrap();
for (slot, _lsn, req_ctx) in key_slots {
result_slots[slot].write(Ok(buf.clone()));
// There is no standardized way to express that the batched span followed from N request spans.
// So, abuse the system and mark the request contexts as follows_from the batch span, so we get
// some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
req_ctx.perf_follows_from(ctx);
slots_filled += 1;
}
result_slots[first_slot].write(res);
first_req_ctx.perf_follows_from(ctx);
slots_filled += 1;
}
} }
result_slots[first_slot].write(res);
first_req_ctx.perf_follows_from(ctx);
slots_filled += 1;
} }
} }
Err(err) => { Err(err) => {
// this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size // this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
// (We enforce the max batch size outside of this function, in the code that constructs the batch request.) // (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
for (slot, req_ctx) in keys_slots.values().flatten() { for (slot, _lsn, req_ctx) in keys_slots.values().flatten() {
// this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError` // this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
// but without taking ownership of the GetVectoredError // but without taking ownership of the GetVectoredError
let err = match &err { let err = match &err {
@@ -488,8 +504,6 @@ impl Timeline {
let mut buf = version.get(self, key, ctx).await?; let mut buf = version.get(self, key, ctx).await?;
let nblocks = buf.get_u32_le(); let nblocks = buf.get_u32_le();
self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
Ok(nblocks) Ok(nblocks)
} }
@@ -1343,32 +1357,6 @@ impl Timeline {
None None
} }
/// Update cached relation size if there is no more recent update
pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
if lsn < rel_size_cache.complete_as_of {
// Do not cache old values. It's safe to cache the size on read, as long as
// the read was at an LSN since we started the WAL ingestion. Reasoning: we
// never evict values from the cache, so if the relation size changed after
// 'lsn', the new value is already in the cache.
return;
}
match rel_size_cache.map.entry(tag) {
hash_map::Entry::Occupied(mut entry) => {
let cached_lsn = entry.get_mut();
if lsn >= cached_lsn.0 {
*cached_lsn = (lsn, nblocks);
}
}
hash_map::Entry::Vacant(entry) => {
entry.insert((lsn, nblocks));
RELSIZE_CACHE_ENTRIES.inc();
}
}
}
/// Store cached relation size /// Store cached relation size
pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) { pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
let mut rel_size_cache = self.rel_size_cache.write().unwrap(); let mut rel_size_cache = self.rel_size_cache.write().unwrap();

View File

@@ -199,11 +199,8 @@ pub struct TimelineResources {
/// The relation size cache caches relation sizes at the end of the timeline. It speeds up WAL /// The relation size cache caches relation sizes at the end of the timeline. It speeds up WAL
/// ingestion considerably, because WAL ingestion needs to check on most records if the record /// ingestion considerably, because WAL ingestion needs to check on most records if the record
/// implicitly extends the relation. At startup, `complete_as_of` is initialized to the current end /// implicitly extends the relation.
/// of the timeline (disk_consistent_lsn). It's used on reads of relation sizes to check if the
/// value can be used to also update the cache, see [`Timeline::update_cached_rel_size`].
pub(crate) struct RelSizeCache { pub(crate) struct RelSizeCache {
pub(crate) complete_as_of: Lsn,
pub(crate) map: HashMap<RelTag, (Lsn, BlockNumber)>, pub(crate) map: HashMap<RelTag, (Lsn, BlockNumber)>,
} }
@@ -2970,7 +2967,6 @@ impl Timeline {
last_received_wal: Mutex::new(None), last_received_wal: Mutex::new(None),
rel_size_cache: RwLock::new(RelSizeCache { rel_size_cache: RwLock::new(RelSizeCache {
complete_as_of: disk_consistent_lsn,
map: HashMap::new(), map: HashMap::new(),
}), }),