mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-19 16:20:36 +00:00
Compare commits
4 Commits
release
...
rel_size_c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d9788d7cbd | ||
|
|
374495c041 | ||
|
|
ca4d758504 | ||
|
|
d5a7ecade5 |
@@ -561,6 +561,21 @@ pub fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[inline(always)]
|
||||||
|
pub fn key_to_rel_tag(key: Key) -> RelTag {
|
||||||
|
RelTag {
|
||||||
|
spcnode: key.field2,
|
||||||
|
dbnode: key.field3,
|
||||||
|
relnode: key.field4,
|
||||||
|
forknum: key.field5,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline(always)]
|
||||||
|
pub fn key_to_blknum(key: Key) -> BlockNumber {
|
||||||
|
key.field6
|
||||||
|
}
|
||||||
|
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
pub fn rel_size_to_key(rel: RelTag) -> Key {
|
pub fn rel_size_to_key(rel: RelTag) -> Key {
|
||||||
Key {
|
Key {
|
||||||
|
|||||||
@@ -16,9 +16,9 @@ use bytes::{Buf, Bytes, BytesMut};
|
|||||||
use enum_map::Enum;
|
use enum_map::Enum;
|
||||||
use pageserver_api::key::{
|
use pageserver_api::key::{
|
||||||
AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key, RelDirExists,
|
AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key, RelDirExists,
|
||||||
TWOPHASEDIR_KEY, dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range,
|
TWOPHASEDIR_KEY, dbdir_key_range, key_to_blknum, key_to_rel_tag, rel_block_to_key,
|
||||||
rel_size_to_key, rel_tag_sparse_key, rel_tag_sparse_key_range, relmap_file_key,
|
rel_dir_to_key, rel_key_range, rel_size_to_key, rel_tag_sparse_key, rel_tag_sparse_key_range,
|
||||||
repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
|
relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
|
||||||
slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
|
slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
|
||||||
};
|
};
|
||||||
use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace};
|
use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace};
|
||||||
@@ -259,7 +259,7 @@ impl Timeline {
|
|||||||
let mut result = Vec::with_capacity(pages.len());
|
let mut result = Vec::with_capacity(pages.len());
|
||||||
let result_slots = result.spare_capacity_mut();
|
let result_slots = result.spare_capacity_mut();
|
||||||
|
|
||||||
let mut keys_slots: HashMap<Key, smallvec::SmallVec<[(usize, RequestContext); 1]>> =
|
let mut keys_slots: HashMap<Key, smallvec::SmallVec<[(usize, Lsn, RequestContext); 1]>> =
|
||||||
HashMap::with_capacity(pages.len());
|
HashMap::with_capacity(pages.len());
|
||||||
|
|
||||||
let mut req_keyspaces: HashMap<Lsn, KeySpaceRandomAccum> =
|
let mut req_keyspaces: HashMap<Lsn, KeySpaceRandomAccum> =
|
||||||
@@ -275,43 +275,6 @@ impl Timeline {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let nblocks = {
|
|
||||||
let ctx = RequestContextBuilder::from(&ctx)
|
|
||||||
.perf_span(|crnt_perf_span| {
|
|
||||||
info_span!(
|
|
||||||
target: PERF_TRACE_TARGET,
|
|
||||||
parent: crnt_perf_span,
|
|
||||||
"GET_REL_SIZE",
|
|
||||||
reltag=%tag,
|
|
||||||
lsn=%lsn,
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.attached_child();
|
|
||||||
|
|
||||||
match self
|
|
||||||
.get_rel_size(*tag, Version::Lsn(lsn), &ctx)
|
|
||||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(nblocks) => nblocks,
|
|
||||||
Err(err) => {
|
|
||||||
result_slots[response_slot_idx].write(Err(err));
|
|
||||||
slots_filled += 1;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if *blknum >= nblocks {
|
|
||||||
debug!(
|
|
||||||
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
|
|
||||||
tag, blknum, lsn, nblocks
|
|
||||||
);
|
|
||||||
result_slots[response_slot_idx].write(Ok(ZERO_PAGE.clone()));
|
|
||||||
slots_filled += 1;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let key = rel_block_to_key(*tag, *blknum);
|
let key = rel_block_to_key(*tag, *blknum);
|
||||||
|
|
||||||
let ctx = RequestContextBuilder::from(&ctx)
|
let ctx = RequestContextBuilder::from(&ctx)
|
||||||
@@ -326,7 +289,7 @@ impl Timeline {
|
|||||||
.attached_child();
|
.attached_child();
|
||||||
|
|
||||||
let key_slots = keys_slots.entry(key).or_default();
|
let key_slots = keys_slots.entry(key).or_default();
|
||||||
key_slots.push((response_slot_idx, ctx));
|
key_slots.push((response_slot_idx, lsn, ctx));
|
||||||
|
|
||||||
let acc = req_keyspaces.entry(lsn).or_default();
|
let acc = req_keyspaces.entry(lsn).or_default();
|
||||||
acc.add_key(key);
|
acc.add_key(key);
|
||||||
@@ -347,42 +310,95 @@ impl Timeline {
|
|||||||
Ok(results) => {
|
Ok(results) => {
|
||||||
for (key, res) in results {
|
for (key, res) in results {
|
||||||
let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
|
let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
|
||||||
let (first_slot, first_req_ctx) = key_slots.next().unwrap();
|
|
||||||
|
|
||||||
for (slot, req_ctx) in key_slots {
|
// Try to check if error is caused by access beyond end of relation
|
||||||
let clone = match &res {
|
match &res {
|
||||||
Ok(buf) => Ok(buf.clone()),
|
Err(err) => {
|
||||||
Err(err) => Err(match err {
|
let tag = key_to_rel_tag(key);
|
||||||
PageReconstructError::Cancelled => PageReconstructError::Cancelled,
|
let blknum = key_to_blknum(key);
|
||||||
|
let mut first_error_slot: Option<usize> = None;
|
||||||
|
for (slot, lsn, req_ctx) in key_slots {
|
||||||
|
// Check relation size only in case of error
|
||||||
|
let relsize_ctx = RequestContextBuilder::from(&ctx)
|
||||||
|
.perf_span(|crnt_perf_span| {
|
||||||
|
info_span!(
|
||||||
|
target: PERF_TRACE_TARGET,
|
||||||
|
parent: crnt_perf_span,
|
||||||
|
"GET_REL_SIZE",
|
||||||
|
reltag=%tag,
|
||||||
|
lsn=%lsn,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.attached_child();
|
||||||
|
|
||||||
x @ PageReconstructError::Other(_)
|
if let Ok(nblocks) = self
|
||||||
| x @ PageReconstructError::AncestorLsnTimeout(_)
|
.get_rel_size(tag, Version::Lsn(lsn), &relsize_ctx)
|
||||||
| x @ PageReconstructError::WalRedo(_)
|
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
|
||||||
| x @ PageReconstructError::MissingKey(_) => {
|
crnt_perf_span.clone()
|
||||||
PageReconstructError::Other(anyhow::anyhow!(
|
})
|
||||||
"there was more than one request for this key in the batch, error logged once: {x:?}"
|
.await
|
||||||
))
|
{
|
||||||
|
if blknum >= nblocks {
|
||||||
|
debug!(
|
||||||
|
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
|
||||||
|
tag, blknum, lsn, nblocks
|
||||||
|
);
|
||||||
|
result_slots[slot].write(Ok(ZERO_PAGE.clone()));
|
||||||
|
slots_filled += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}),
|
if first_error_slot.is_none() {
|
||||||
};
|
first_error_slot = Some(slot);
|
||||||
|
} else {
|
||||||
|
let err = match err {
|
||||||
|
PageReconstructError::Cancelled => {
|
||||||
|
PageReconstructError::Cancelled
|
||||||
|
}
|
||||||
|
|
||||||
result_slots[slot].write(clone);
|
x @ PageReconstructError::Other(_)
|
||||||
// There is no standardized way to express that the batched span followed from N request spans.
|
| x @ PageReconstructError::AncestorLsnTimeout(_)
|
||||||
// So, abuse the system and mark the request contexts as follows_from the batch span, so we get
|
| x @ PageReconstructError::WalRedo(_)
|
||||||
// some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
|
| x @ PageReconstructError::MissingKey(_) => {
|
||||||
req_ctx.perf_follows_from(ctx);
|
PageReconstructError::Other(anyhow::anyhow!(
|
||||||
slots_filled += 1;
|
"there was more than one request for this key in the batch, error logged once: {x:?}"
|
||||||
|
))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
result_slots[slot].write(Err(err));
|
||||||
|
};
|
||||||
|
// There is no standardized way to express that the batched span followed from N request spans.
|
||||||
|
// So, abuse the system and mark the request contexts as follows_from the batch span, so we get
|
||||||
|
// some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
|
||||||
|
req_ctx.perf_follows_from(ctx);
|
||||||
|
slots_filled += 1;
|
||||||
|
}
|
||||||
|
if let Some(slot) = first_error_slot {
|
||||||
|
result_slots[slot].write(res);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(buf) => {
|
||||||
|
let (first_slot, _first_lsn, first_req_ctx) = key_slots.next().unwrap();
|
||||||
|
|
||||||
|
for (slot, _lsn, req_ctx) in key_slots {
|
||||||
|
result_slots[slot].write(Ok(buf.clone()));
|
||||||
|
// There is no standardized way to express that the batched span followed from N request spans.
|
||||||
|
// So, abuse the system and mark the request contexts as follows_from the batch span, so we get
|
||||||
|
// some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
|
||||||
|
req_ctx.perf_follows_from(ctx);
|
||||||
|
slots_filled += 1;
|
||||||
|
}
|
||||||
|
result_slots[first_slot].write(res);
|
||||||
|
first_req_ctx.perf_follows_from(ctx);
|
||||||
|
slots_filled += 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
result_slots[first_slot].write(res);
|
|
||||||
first_req_ctx.perf_follows_from(ctx);
|
|
||||||
slots_filled += 1;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
// this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
|
// this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
|
||||||
// (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
|
// (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
|
||||||
for (slot, req_ctx) in keys_slots.values().flatten() {
|
for (slot, _lsn, req_ctx) in keys_slots.values().flatten() {
|
||||||
// this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
|
// this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
|
||||||
// but without taking ownership of the GetVectoredError
|
// but without taking ownership of the GetVectoredError
|
||||||
let err = match &err {
|
let err = match &err {
|
||||||
@@ -488,8 +504,6 @@ impl Timeline {
|
|||||||
let mut buf = version.get(self, key, ctx).await?;
|
let mut buf = version.get(self, key, ctx).await?;
|
||||||
let nblocks = buf.get_u32_le();
|
let nblocks = buf.get_u32_le();
|
||||||
|
|
||||||
self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
|
|
||||||
|
|
||||||
Ok(nblocks)
|
Ok(nblocks)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1343,32 +1357,6 @@ impl Timeline {
|
|||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Update cached relation size if there is no more recent update
|
|
||||||
pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
|
|
||||||
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
|
|
||||||
|
|
||||||
if lsn < rel_size_cache.complete_as_of {
|
|
||||||
// Do not cache old values. It's safe to cache the size on read, as long as
|
|
||||||
// the read was at an LSN since we started the WAL ingestion. Reasoning: we
|
|
||||||
// never evict values from the cache, so if the relation size changed after
|
|
||||||
// 'lsn', the new value is already in the cache.
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
match rel_size_cache.map.entry(tag) {
|
|
||||||
hash_map::Entry::Occupied(mut entry) => {
|
|
||||||
let cached_lsn = entry.get_mut();
|
|
||||||
if lsn >= cached_lsn.0 {
|
|
||||||
*cached_lsn = (lsn, nblocks);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
hash_map::Entry::Vacant(entry) => {
|
|
||||||
entry.insert((lsn, nblocks));
|
|
||||||
RELSIZE_CACHE_ENTRIES.inc();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Store cached relation size
|
/// Store cached relation size
|
||||||
pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
|
pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
|
||||||
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
|
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
|
||||||
|
|||||||
@@ -199,11 +199,8 @@ pub struct TimelineResources {
|
|||||||
|
|
||||||
/// The relation size cache caches relation sizes at the end of the timeline. It speeds up WAL
|
/// The relation size cache caches relation sizes at the end of the timeline. It speeds up WAL
|
||||||
/// ingestion considerably, because WAL ingestion needs to check on most records if the record
|
/// ingestion considerably, because WAL ingestion needs to check on most records if the record
|
||||||
/// implicitly extends the relation. At startup, `complete_as_of` is initialized to the current end
|
/// implicitly extends the relation.
|
||||||
/// of the timeline (disk_consistent_lsn). It's used on reads of relation sizes to check if the
|
|
||||||
/// value can be used to also update the cache, see [`Timeline::update_cached_rel_size`].
|
|
||||||
pub(crate) struct RelSizeCache {
|
pub(crate) struct RelSizeCache {
|
||||||
pub(crate) complete_as_of: Lsn,
|
|
||||||
pub(crate) map: HashMap<RelTag, (Lsn, BlockNumber)>,
|
pub(crate) map: HashMap<RelTag, (Lsn, BlockNumber)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2970,7 +2967,6 @@ impl Timeline {
|
|||||||
|
|
||||||
last_received_wal: Mutex::new(None),
|
last_received_wal: Mutex::new(None),
|
||||||
rel_size_cache: RwLock::new(RelSizeCache {
|
rel_size_cache: RwLock::new(RelSizeCache {
|
||||||
complete_as_of: disk_consistent_lsn,
|
|
||||||
map: HashMap::new(),
|
map: HashMap::new(),
|
||||||
}),
|
}),
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user