mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 00:42:54 +00:00
feat(pageserver): use vectored_get in collect_keyspace
Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
@@ -464,6 +464,18 @@ pub fn rel_size_to_key(rel: RelTag) -> Key {
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn rel_size_key_to_rel(key: Key) -> RelTag {
|
||||
assert_eq!(key.field1, 0x00);
|
||||
assert_eq!(key.field6, 0xffff_ffff);
|
||||
RelTag {
|
||||
forknum: key.field5,
|
||||
spcnode: key.field2,
|
||||
dbnode: key.field3,
|
||||
relnode: key.field4,
|
||||
}
|
||||
}
|
||||
|
||||
impl Key {
|
||||
#[inline(always)]
|
||||
pub fn is_rel_size_key(&self) -> bool {
|
||||
@@ -559,6 +571,15 @@ pub fn slru_segment_size_to_key(kind: SlruKind, segno: u32) -> Key {
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn slru_segment_size_key_to_segno(key: Key) -> u32 {
|
||||
assert_eq!(key.field1, 0x01);
|
||||
assert_eq!(key.field3, 1);
|
||||
assert_eq!(key.field5, 0);
|
||||
assert_eq!(key.field6, 0xffff_ffff);
|
||||
key.field4
|
||||
}
|
||||
|
||||
impl Key {
|
||||
pub fn is_slru_segment_size_key(&self) -> bool {
|
||||
self.field1 == 0x01
|
||||
|
||||
@@ -17,20 +17,21 @@ use crate::span::{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id,
|
||||
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
|
||||
};
|
||||
use crate::tenant::storage_layer::IoConcurrency;
|
||||
use crate::tenant::storage_layer::{IoConcurrency, ValuesReconstructState};
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use anyhow::{ensure, Context};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use enum_map::Enum;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::key::{
|
||||
dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
|
||||
relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
|
||||
slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
|
||||
CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
|
||||
slru_segment_key_range, slru_segment_size_key_to_segno, slru_segment_size_to_key,
|
||||
twophase_file_key, twophase_key_range, CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY,
|
||||
CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
|
||||
};
|
||||
use pageserver_api::keyspace::SparseKeySpace;
|
||||
use pageserver_api::key::{rel_size_key_to_rel, Key};
|
||||
use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace};
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
@@ -110,10 +111,21 @@ pub(crate) enum CollectKeySpaceError {
|
||||
Decode(#[from] DeserializeError),
|
||||
#[error(transparent)]
|
||||
PageRead(PageReconstructError),
|
||||
#[error(transparent)]
|
||||
GetVectored(GetVectoredError),
|
||||
#[error("cancelled")]
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
impl From<GetVectoredError> for CollectKeySpaceError {
|
||||
fn from(err: GetVectoredError) -> Self {
|
||||
match err {
|
||||
GetVectoredError::Cancelled => Self::Cancelled,
|
||||
err => Self::GetVectored(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PageReconstructError> for CollectKeySpaceError {
|
||||
fn from(err: PageReconstructError) -> Self {
|
||||
match err {
|
||||
@@ -1071,11 +1083,30 @@ impl Timeline {
|
||||
.into_iter()
|
||||
.collect();
|
||||
rels.sort_unstable();
|
||||
let mut relsize_keys_to_collect = KeySpaceRandomAccum::new();
|
||||
for rel in rels {
|
||||
let relsize_key = rel_size_to_key(rel);
|
||||
let mut buf = self.get(relsize_key, lsn, ctx).await?;
|
||||
relsize_keys_to_collect.add_key(relsize_key);
|
||||
}
|
||||
// Skip the vectored-read max key check by using `get_vectored_impl`.
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf,
|
||||
self.gate
|
||||
.enter()
|
||||
.map_err(|_| CollectKeySpaceError::Cancelled)?,
|
||||
);
|
||||
let res = self
|
||||
.get_vectored_impl(
|
||||
relsize_keys_to_collect.consume_keyspace(),
|
||||
lsn,
|
||||
&mut ValuesReconstructState::new(io_concurrency),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
for (relsize_key, buf) in res {
|
||||
let mut buf = buf?;
|
||||
let relsize = buf.get_u32_le();
|
||||
|
||||
let rel = rel_size_key_to_rel(relsize_key);
|
||||
result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
|
||||
result.add_key(relsize_key);
|
||||
}
|
||||
@@ -1094,11 +1125,30 @@ impl Timeline {
|
||||
let dir = SlruSegmentDirectory::des(&buf)?;
|
||||
let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
|
||||
segments.sort_unstable();
|
||||
let mut segsize_keys_to_collect = KeySpaceRandomAccum::new();
|
||||
for segno in segments {
|
||||
let segsize_key = slru_segment_size_to_key(kind, segno);
|
||||
let mut buf = self.get(segsize_key, lsn, ctx).await?;
|
||||
segsize_keys_to_collect.add_key(segsize_key);
|
||||
}
|
||||
// Skip the vectored-read max key check by using `get_vectored_impl`.
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf,
|
||||
self.gate
|
||||
.enter()
|
||||
.map_err(|_| CollectKeySpaceError::Cancelled)?,
|
||||
);
|
||||
let res = self
|
||||
.get_vectored_impl(
|
||||
segsize_keys_to_collect.consume_keyspace(),
|
||||
lsn,
|
||||
&mut ValuesReconstructState::new(io_concurrency),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
for (segsize_key, buf) in res {
|
||||
let mut buf = buf?;
|
||||
let segsize = buf.get_u32_le();
|
||||
|
||||
let segno = slru_segment_size_key_to_segno(segsize_key);
|
||||
result.add_range(
|
||||
slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
|
||||
);
|
||||
|
||||
@@ -1157,7 +1157,7 @@ impl Timeline {
|
||||
vectored_res
|
||||
}
|
||||
|
||||
pub(super) async fn get_vectored_impl(
|
||||
pub(crate) async fn get_vectored_impl(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
lsn: Lsn,
|
||||
|
||||
Reference in New Issue
Block a user