diff --git a/libs/pageserver_api/src/key.rs b/libs/pageserver_api/src/key.rs index dbd45da314..ea7fe74db0 100644 --- a/libs/pageserver_api/src/key.rs +++ b/libs/pageserver_api/src/key.rs @@ -464,6 +464,18 @@ pub fn rel_size_to_key(rel: RelTag) -> Key { } } +#[inline(always)] +pub fn rel_size_key_to_rel(key: Key) -> RelTag { + assert_eq!(key.field1, 0x00); + assert_eq!(key.field6, 0xffff_ffff); + RelTag { + forknum: key.field5, + spcnode: key.field2, + dbnode: key.field3, + relnode: key.field4, + } +} + impl Key { #[inline(always)] pub fn is_rel_size_key(&self) -> bool { @@ -559,6 +571,15 @@ pub fn slru_segment_size_to_key(kind: SlruKind, segno: u32) -> Key { } } +#[inline(always)] +pub fn slru_segment_size_key_to_segno(key: Key) -> u32 { + assert_eq!(key.field1, 0x01); + assert_eq!(key.field3, 1); + assert_eq!(key.field5, 0); + assert_eq!(key.field6, 0xffff_ffff); + key.field4 +} + impl Key { pub fn is_slru_segment_size_key(&self) -> bool { self.field1 == 0x01 diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 40c657524d..0339bdc391 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -17,20 +17,21 @@ use crate::span::{ debug_assert_current_span_has_tenant_and_timeline_id, debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id, }; -use crate::tenant::storage_layer::IoConcurrency; +use crate::tenant::storage_layer::{IoConcurrency, ValuesReconstructState}; use crate::tenant::timeline::GetVectoredError; use anyhow::{ensure, Context}; use bytes::{Buf, Bytes, BytesMut}; use enum_map::Enum; use itertools::Itertools; -use pageserver_api::key::Key; use pageserver_api::key::{ dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key, relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key, - slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range, - CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY, + slru_segment_key_range, slru_segment_size_key_to_segno, slru_segment_size_to_key, + twophase_file_key, twophase_key_range, CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, + CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY, }; -use pageserver_api::keyspace::SparseKeySpace; +use pageserver_api::key::{rel_size_key_to_rel, Key}; +use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace}; use pageserver_api::record::NeonWalRecord; use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind}; use pageserver_api::shard::ShardIdentity; @@ -110,10 +111,21 @@ pub(crate) enum CollectKeySpaceError { Decode(#[from] DeserializeError), #[error(transparent)] PageRead(PageReconstructError), + #[error(transparent)] + GetVectored(GetVectoredError), #[error("cancelled")] Cancelled, } +impl From for CollectKeySpaceError { + fn from(err: GetVectoredError) -> Self { + match err { + GetVectoredError::Cancelled => Self::Cancelled, + err => Self::GetVectored(err), + } + } +} + impl From for CollectKeySpaceError { fn from(err: PageReconstructError) -> Self { match err { @@ -1071,11 +1083,30 @@ impl Timeline { .into_iter() .collect(); rels.sort_unstable(); + let mut relsize_keys_to_collect = KeySpaceRandomAccum::new(); for rel in rels { let relsize_key = rel_size_to_key(rel); - let mut buf = self.get(relsize_key, lsn, ctx).await?; + relsize_keys_to_collect.add_key(relsize_key); + } + // Skip the vectored-read max key check by using `get_vectored_impl`. + let io_concurrency = IoConcurrency::spawn_from_conf( + self.conf, + self.gate + .enter() + .map_err(|_| CollectKeySpaceError::Cancelled)?, + ); + let res = self + .get_vectored_impl( + relsize_keys_to_collect.consume_keyspace(), + lsn, + &mut ValuesReconstructState::new(io_concurrency), + ctx, + ) + .await?; + for (relsize_key, buf) in res { + let mut buf = buf?; let relsize = buf.get_u32_le(); - + let rel = rel_size_key_to_rel(relsize_key); result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize)); result.add_key(relsize_key); } @@ -1094,11 +1125,30 @@ impl Timeline { let dir = SlruSegmentDirectory::des(&buf)?; let mut segments: Vec = dir.segments.iter().cloned().collect(); segments.sort_unstable(); + let mut segsize_keys_to_collect = KeySpaceRandomAccum::new(); for segno in segments { let segsize_key = slru_segment_size_to_key(kind, segno); - let mut buf = self.get(segsize_key, lsn, ctx).await?; + segsize_keys_to_collect.add_key(segsize_key); + } + // Skip the vectored-read max key check by using `get_vectored_impl`. + let io_concurrency = IoConcurrency::spawn_from_conf( + self.conf, + self.gate + .enter() + .map_err(|_| CollectKeySpaceError::Cancelled)?, + ); + let res = self + .get_vectored_impl( + segsize_keys_to_collect.consume_keyspace(), + lsn, + &mut ValuesReconstructState::new(io_concurrency), + ctx, + ) + .await?; + for (segsize_key, buf) in res { + let mut buf = buf?; let segsize = buf.get_u32_le(); - + let segno = slru_segment_size_key_to_segno(segsize_key); result.add_range( slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize), ); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 076220df51..741e565b5f 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1157,7 +1157,7 @@ impl Timeline { vectored_res } - pub(super) async fn get_vectored_impl( + pub(crate) async fn get_vectored_impl( &self, keyspace: KeySpace, lsn: Lsn,