Compare commits

...

4 Commits

Author SHA1 Message Date
Konstantin Knizhnik
d9788d7cbd Check relation size for gertpage only in case of error 2025-05-10 09:06:37 +03:00
Konstantin Knizhnik
374495c041 Check if page is not out of relation size in getpage for VM/FSM forks 2025-05-09 18:15:18 +03:00
Konstantin Knizhnik
ca4d758504 Make clippy happy 2025-05-09 17:17:43 +03:00
Konstantin Knizhnik
d5a7ecade5 Do not check relsize cache in getpage and do not update it in get_rel_size 2025-05-09 16:53:44 +03:00
3 changed files with 100 additions and 101 deletions

View File

@@ -561,6 +561,21 @@ pub fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
}
}
#[inline(always)]
pub fn key_to_rel_tag(key: Key) -> RelTag {
RelTag {
spcnode: key.field2,
dbnode: key.field3,
relnode: key.field4,
forknum: key.field5,
}
}
#[inline(always)]
pub fn key_to_blknum(key: Key) -> BlockNumber {
key.field6
}
#[inline(always)]
pub fn rel_size_to_key(rel: RelTag) -> Key {
Key {

View File

@@ -16,9 +16,9 @@ use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
use pageserver_api::key::{
AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key, RelDirExists,
TWOPHASEDIR_KEY, dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range,
rel_size_to_key, rel_tag_sparse_key, rel_tag_sparse_key_range, relmap_file_key,
repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
TWOPHASEDIR_KEY, dbdir_key_range, key_to_blknum, key_to_rel_tag, rel_block_to_key,
rel_dir_to_key, rel_key_range, rel_size_to_key, rel_tag_sparse_key, rel_tag_sparse_key_range,
relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
};
use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace};
@@ -259,7 +259,7 @@ impl Timeline {
let mut result = Vec::with_capacity(pages.len());
let result_slots = result.spare_capacity_mut();
let mut keys_slots: HashMap<Key, smallvec::SmallVec<[(usize, RequestContext); 1]>> =
let mut keys_slots: HashMap<Key, smallvec::SmallVec<[(usize, Lsn, RequestContext); 1]>> =
HashMap::with_capacity(pages.len());
let mut req_keyspaces: HashMap<Lsn, KeySpaceRandomAccum> =
@@ -275,43 +275,6 @@ impl Timeline {
continue;
}
let nblocks = {
let ctx = RequestContextBuilder::from(&ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"GET_REL_SIZE",
reltag=%tag,
lsn=%lsn,
)
})
.attached_child();
match self
.get_rel_size(*tag, Version::Lsn(lsn), &ctx)
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
.await
{
Ok(nblocks) => nblocks,
Err(err) => {
result_slots[response_slot_idx].write(Err(err));
slots_filled += 1;
continue;
}
}
};
if *blknum >= nblocks {
debug!(
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
tag, blknum, lsn, nblocks
);
result_slots[response_slot_idx].write(Ok(ZERO_PAGE.clone()));
slots_filled += 1;
continue;
}
let key = rel_block_to_key(*tag, *blknum);
let ctx = RequestContextBuilder::from(&ctx)
@@ -326,7 +289,7 @@ impl Timeline {
.attached_child();
let key_slots = keys_slots.entry(key).or_default();
key_slots.push((response_slot_idx, ctx));
key_slots.push((response_slot_idx, lsn, ctx));
let acc = req_keyspaces.entry(lsn).or_default();
acc.add_key(key);
@@ -347,42 +310,95 @@ impl Timeline {
Ok(results) => {
for (key, res) in results {
let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
let (first_slot, first_req_ctx) = key_slots.next().unwrap();
for (slot, req_ctx) in key_slots {
let clone = match &res {
Ok(buf) => Ok(buf.clone()),
Err(err) => Err(match err {
PageReconstructError::Cancelled => PageReconstructError::Cancelled,
// Try to check if error is caused by access beyond end of relation
match &res {
Err(err) => {
let tag = key_to_rel_tag(key);
let blknum = key_to_blknum(key);
let mut first_error_slot: Option<usize> = None;
for (slot, lsn, req_ctx) in key_slots {
// Check relation size only in case of error
let relsize_ctx = RequestContextBuilder::from(&ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"GET_REL_SIZE",
reltag=%tag,
lsn=%lsn,
)
})
.attached_child();
x @ PageReconstructError::Other(_)
| x @ PageReconstructError::AncestorLsnTimeout(_)
| x @ PageReconstructError::WalRedo(_)
| x @ PageReconstructError::MissingKey(_) => {
PageReconstructError::Other(anyhow::anyhow!(
"there was more than one request for this key in the batch, error logged once: {x:?}"
))
if let Ok(nblocks) = self
.get_rel_size(tag, Version::Lsn(lsn), &relsize_ctx)
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
crnt_perf_span.clone()
})
.await
{
if blknum >= nblocks {
debug!(
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
tag, blknum, lsn, nblocks
);
result_slots[slot].write(Ok(ZERO_PAGE.clone()));
slots_filled += 1;
continue;
}
}
}),
};
if first_error_slot.is_none() {
first_error_slot = Some(slot);
} else {
let err = match err {
PageReconstructError::Cancelled => {
PageReconstructError::Cancelled
}
result_slots[slot].write(clone);
// There is no standardized way to express that the batched span followed from N request spans.
// So, abuse the system and mark the request contexts as follows_from the batch span, so we get
// some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
req_ctx.perf_follows_from(ctx);
slots_filled += 1;
x @ PageReconstructError::Other(_)
| x @ PageReconstructError::AncestorLsnTimeout(_)
| x @ PageReconstructError::WalRedo(_)
| x @ PageReconstructError::MissingKey(_) => {
PageReconstructError::Other(anyhow::anyhow!(
"there was more than one request for this key in the batch, error logged once: {x:?}"
))
}
};
result_slots[slot].write(Err(err));
};
// There is no standardized way to express that the batched span followed from N request spans.
// So, abuse the system and mark the request contexts as follows_from the batch span, so we get
// some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
req_ctx.perf_follows_from(ctx);
slots_filled += 1;
}
if let Some(slot) = first_error_slot {
result_slots[slot].write(res);
}
}
Ok(buf) => {
let (first_slot, _first_lsn, first_req_ctx) = key_slots.next().unwrap();
for (slot, _lsn, req_ctx) in key_slots {
result_slots[slot].write(Ok(buf.clone()));
// There is no standardized way to express that the batched span followed from N request spans.
// So, abuse the system and mark the request contexts as follows_from the batch span, so we get
// some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
req_ctx.perf_follows_from(ctx);
slots_filled += 1;
}
result_slots[first_slot].write(res);
first_req_ctx.perf_follows_from(ctx);
slots_filled += 1;
}
}
result_slots[first_slot].write(res);
first_req_ctx.perf_follows_from(ctx);
slots_filled += 1;
}
}
Err(err) => {
// this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
// (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
for (slot, req_ctx) in keys_slots.values().flatten() {
for (slot, _lsn, req_ctx) in keys_slots.values().flatten() {
// this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
// but without taking ownership of the GetVectoredError
let err = match &err {
@@ -488,8 +504,6 @@ impl Timeline {
let mut buf = version.get(self, key, ctx).await?;
let nblocks = buf.get_u32_le();
self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
Ok(nblocks)
}
@@ -1343,32 +1357,6 @@ impl Timeline {
None
}
/// Update cached relation size if there is no more recent update
pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
if lsn < rel_size_cache.complete_as_of {
// Do not cache old values. It's safe to cache the size on read, as long as
// the read was at an LSN since we started the WAL ingestion. Reasoning: we
// never evict values from the cache, so if the relation size changed after
// 'lsn', the new value is already in the cache.
return;
}
match rel_size_cache.map.entry(tag) {
hash_map::Entry::Occupied(mut entry) => {
let cached_lsn = entry.get_mut();
if lsn >= cached_lsn.0 {
*cached_lsn = (lsn, nblocks);
}
}
hash_map::Entry::Vacant(entry) => {
entry.insert((lsn, nblocks));
RELSIZE_CACHE_ENTRIES.inc();
}
}
}
/// Store cached relation size
pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
let mut rel_size_cache = self.rel_size_cache.write().unwrap();

View File

@@ -199,11 +199,8 @@ pub struct TimelineResources {
/// The relation size cache caches relation sizes at the end of the timeline. It speeds up WAL
/// ingestion considerably, because WAL ingestion needs to check on most records if the record
/// implicitly extends the relation. At startup, `complete_as_of` is initialized to the current end
/// of the timeline (disk_consistent_lsn). It's used on reads of relation sizes to check if the
/// value can be used to also update the cache, see [`Timeline::update_cached_rel_size`].
/// implicitly extends the relation.
pub(crate) struct RelSizeCache {
pub(crate) complete_as_of: Lsn,
pub(crate) map: HashMap<RelTag, (Lsn, BlockNumber)>,
}
@@ -2970,7 +2967,6 @@ impl Timeline {
last_received_wal: Mutex::new(None),
rel_size_cache: RwLock::new(RelSizeCache {
complete_as_of: disk_consistent_lsn,
map: HashMap::new(),
}),