diff --git a/Cargo.lock b/Cargo.lock index e26ba2a8c8..cd95ee5fdf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4641,7 +4641,6 @@ dependencies = [ "pageserver_api", "postgres_ffi", "prost 0.13.5", - "smallvec", "thiserror 1.0.69", "tonic 0.13.1", "tonic-build", diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index 444983bd18..7ca623f8e5 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -181,6 +181,7 @@ pub struct ConfigToml { pub virtual_file_io_engine: Option, pub ingest_batch_size: u64, pub max_vectored_read_bytes: MaxVectoredReadBytes, + pub max_get_vectored_keys: MaxGetVectoredKeys, pub image_compression: ImageCompressionAlgorithm, pub timeline_offloading: bool, pub ephemeral_bytes_per_memory_kb: usize, @@ -229,7 +230,7 @@ pub enum PageServicePipeliningConfig { } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct PageServicePipeliningConfigPipelined { - /// Causes runtime errors if larger than max get_vectored batch size. + /// Failed config parsing and validation if larger than `max_get_vectored_keys`. pub max_batch_size: NonZeroUsize, pub execution: PageServiceProtocolPipelinedExecutionStrategy, // The default below is such that new versions of the software can start @@ -403,6 +404,16 @@ impl Default for EvictionOrder { #[serde(transparent)] pub struct MaxVectoredReadBytes(pub NonZeroUsize); +#[derive(Copy, Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[serde(transparent)] +pub struct MaxGetVectoredKeys(NonZeroUsize); + +impl MaxGetVectoredKeys { + pub fn get(&self) -> usize { + self.0.get() + } +} + /// Tenant-level configuration values, used for various purposes. #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] #[serde(default)] @@ -587,6 +598,8 @@ pub mod defaults { /// That is, slightly above 128 kB. pub const DEFAULT_MAX_VECTORED_READ_BYTES: usize = 130 * 1024; // 130 KiB + pub const DEFAULT_MAX_GET_VECTORED_KEYS: usize = 32; + pub const DEFAULT_IMAGE_COMPRESSION: ImageCompressionAlgorithm = ImageCompressionAlgorithm::Zstd { level: Some(1) }; @@ -685,6 +698,9 @@ impl Default for ConfigToml { max_vectored_read_bytes: (MaxVectoredReadBytes( NonZeroUsize::new(DEFAULT_MAX_VECTORED_READ_BYTES).unwrap(), )), + max_get_vectored_keys: (MaxGetVectoredKeys( + NonZeroUsize::new(DEFAULT_MAX_GET_VECTORED_KEYS).unwrap(), + )), image_compression: (DEFAULT_IMAGE_COMPRESSION), timeline_offloading: true, ephemeral_bytes_per_memory_kb: (DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB), diff --git a/pageserver/client_grpc/src/lib.rs b/pageserver/client_grpc/src/lib.rs index 3d6de1f900..2947754817 100644 --- a/pageserver/client_grpc/src/lib.rs +++ b/pageserver/client_grpc/src/lib.rs @@ -156,7 +156,7 @@ impl PageserverClient { let mut client = PageServiceClient::with_interceptor(chan, self.auth_interceptor.for_shard(shard)); - let request = proto::CheckRelExistsRequest::try_from(request)?; + let request = proto::CheckRelExistsRequest::from(request); let response = client.check_rel_exists(tonic::Request::new(request)).await; match response { @@ -183,7 +183,7 @@ impl PageserverClient { let mut client = PageServiceClient::with_interceptor(chan, self.auth_interceptor.for_shard(shard)); - let request = proto::GetRelSizeRequest::try_from(request)?; + let request = proto::GetRelSizeRequest::from(request); let response = client.get_rel_size(tonic::Request::new(request)).await; match response { @@ -213,7 +213,7 @@ impl PageserverClient { let mut client = PageServiceClient::with_interceptor(chan, self.auth_interceptor.for_shard(shard)); - let request = proto::GetPageRequest::try_from(request)?; + let request = proto::GetPageRequest::from(request); let request_stream = futures::stream::once(std::future::ready(request)); @@ -296,7 +296,7 @@ impl PageserverClient { let mut client = PageServiceClient::with_interceptor(chan, self.auth_interceptor.for_shard(shard)); - let request = proto::GetDbSizeRequest::try_from(request)?; + let request = proto::GetDbSizeRequest::from(request); let response = client.get_db_size(tonic::Request::new(request)).await; match response { @@ -331,7 +331,7 @@ impl PageserverClient { client = client.accept_compressed(tonic::codec::CompressionEncoding::Gzip); } - let request = proto::GetBaseBackupRequest::try_from(request)?; + let request = proto::GetBaseBackupRequest::from(request); let response = client.get_base_backup(tonic::Request::new(request)).await; match response { diff --git a/pageserver/page_api/Cargo.toml b/pageserver/page_api/Cargo.toml index 4f62c77eb2..e643b5749b 100644 --- a/pageserver/page_api/Cargo.toml +++ b/pageserver/page_api/Cargo.toml @@ -9,7 +9,6 @@ bytes.workspace = true pageserver_api.workspace = true postgres_ffi.workspace = true prost.workspace = true -smallvec.workspace = true thiserror.workspace = true tonic.workspace = true utils.workspace = true diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index 7e7d0eb32d..1a08d04cc1 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -9,12 +9,16 @@ //! - Use more precise datatypes, e.g. Lsn and uints shorter than 32 bits. //! //! - Validate protocol invariants, via try_from() and try_into(). +//! +//! Validation only happens on the receiver side, i.e. when converting from Protobuf to domain +//! types. This is where it matters -- the Protobuf types are less strict than the domain types, and +//! receivers should expect all sorts of junk from senders. This also allows the sender to use e.g. +//! stream combinators without dealing with errors, and avoids validating the same message twice. use std::fmt::Display; use bytes::Bytes; use postgres_ffi::Oid; -use smallvec::SmallVec; // TODO: split out Lsn, RelTag, SlruKind, Oid and other basic types to a separate crate, to avoid // pulling in all of their other crate dependencies when building the client. use utils::lsn::Lsn; @@ -72,47 +76,35 @@ impl Display for ReadLsn { } } -impl ReadLsn { - /// Validates the ReadLsn. - pub fn validate(&self) -> Result<(), ProtocolError> { - if self.request_lsn == Lsn::INVALID { - return Err(ProtocolError::invalid("request_lsn", self.request_lsn)); - } - if self.not_modified_since_lsn > Some(self.request_lsn) { - return Err(ProtocolError::invalid( - "not_modified_since_lsn", - self.not_modified_since_lsn, - )); - } - Ok(()) - } -} - impl TryFrom for ReadLsn { type Error = ProtocolError; fn try_from(pb: proto::ReadLsn) -> Result { - let read_lsn = Self { + if pb.request_lsn == 0 { + return Err(ProtocolError::invalid("request_lsn", pb.request_lsn)); + } + if pb.not_modified_since_lsn > pb.request_lsn { + return Err(ProtocolError::invalid( + "not_modified_since_lsn", + pb.not_modified_since_lsn, + )); + } + Ok(Self { request_lsn: Lsn(pb.request_lsn), not_modified_since_lsn: match pb.not_modified_since_lsn { 0 => None, lsn => Some(Lsn(lsn)), }, - }; - read_lsn.validate()?; - Ok(read_lsn) + }) } } -impl TryFrom for proto::ReadLsn { - type Error = ProtocolError; - - fn try_from(read_lsn: ReadLsn) -> Result { - read_lsn.validate()?; - Ok(Self { +impl From for proto::ReadLsn { + fn from(read_lsn: ReadLsn) -> Self { + Self { request_lsn: read_lsn.request_lsn.0, not_modified_since_lsn: read_lsn.not_modified_since_lsn.unwrap_or_default().0, - }) + } } } @@ -167,14 +159,12 @@ impl TryFrom for CheckRelExistsRequest { } } -impl TryFrom for proto::CheckRelExistsRequest { - type Error = ProtocolError; - - fn try_from(request: CheckRelExistsRequest) -> Result { - Ok(Self { - read_lsn: Some(request.read_lsn.try_into()?), +impl From for proto::CheckRelExistsRequest { + fn from(request: CheckRelExistsRequest) -> Self { + Self { + read_lsn: Some(request.read_lsn.into()), rel: Some(request.rel.into()), - }) + } } } @@ -215,14 +205,12 @@ impl TryFrom for GetBaseBackupRequest { } } -impl TryFrom for proto::GetBaseBackupRequest { - type Error = ProtocolError; - - fn try_from(request: GetBaseBackupRequest) -> Result { - Ok(Self { - read_lsn: Some(request.read_lsn.try_into()?), +impl From for proto::GetBaseBackupRequest { + fn from(request: GetBaseBackupRequest) -> Self { + Self { + read_lsn: Some(request.read_lsn.into()), replica: request.replica, - }) + } } } @@ -239,14 +227,9 @@ impl TryFrom for GetBaseBackupResponseChunk { } } -impl TryFrom for proto::GetBaseBackupResponseChunk { - type Error = ProtocolError; - - fn try_from(chunk: GetBaseBackupResponseChunk) -> Result { - if chunk.is_empty() { - return Err(ProtocolError::Missing("chunk")); - } - Ok(Self { chunk }) +impl From for proto::GetBaseBackupResponseChunk { + fn from(chunk: GetBaseBackupResponseChunk) -> Self { + Self { chunk } } } @@ -271,14 +254,12 @@ impl TryFrom for GetDbSizeRequest { } } -impl TryFrom for proto::GetDbSizeRequest { - type Error = ProtocolError; - - fn try_from(request: GetDbSizeRequest) -> Result { - Ok(Self { - read_lsn: Some(request.read_lsn.try_into()?), +impl From for proto::GetDbSizeRequest { + fn from(request: GetDbSizeRequest) -> Self { + Self { + read_lsn: Some(request.read_lsn.into()), db_oid: request.db_oid, - }) + } } } @@ -313,7 +294,7 @@ pub struct GetPageRequest { /// Multiple pages will be executed as a single batch by the Pageserver, amortizing layer access /// costs and parallelizing them. This may increase the latency of any individual request, but /// improves the overall latency and throughput of the batch as a whole. - pub block_numbers: SmallVec<[u32; 1]>, + pub block_numbers: Vec, } impl TryFrom for GetPageRequest { @@ -331,25 +312,20 @@ impl TryFrom for GetPageRequest { .ok_or(ProtocolError::Missing("read_lsn"))? .try_into()?, rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?, - block_numbers: pb.block_number.into(), + block_numbers: pb.block_number, }) } } -impl TryFrom for proto::GetPageRequest { - type Error = ProtocolError; - - fn try_from(request: GetPageRequest) -> Result { - if request.block_numbers.is_empty() { - return Err(ProtocolError::Missing("block_number")); - } - Ok(Self { +impl From for proto::GetPageRequest { + fn from(request: GetPageRequest) -> Self { + Self { request_id: request.request_id, request_class: request.request_class.into(), - read_lsn: Some(request.read_lsn.try_into()?), + read_lsn: Some(request.read_lsn.into()), rel: Some(request.rel.into()), - block_number: request.block_numbers.into_vec(), - }) + block_number: request.block_numbers, + } } } @@ -421,7 +397,7 @@ pub struct GetPageResponse { /// A string describing the status, if any. pub reason: Option, /// The 8KB page images, in the same order as the request. Empty if status != OK. - pub page_images: SmallVec<[Bytes; 1]>, + pub page_images: Vec, } impl From for GetPageResponse { @@ -430,7 +406,7 @@ impl From for GetPageResponse { request_id: pb.request_id, status_code: pb.status_code.into(), reason: Some(pb.reason).filter(|r| !r.is_empty()), - page_images: pb.page_image.into(), + page_images: pb.page_image, } } } @@ -441,7 +417,7 @@ impl From for proto::GetPageResponse { request_id: response.request_id, status_code: response.status_code.into(), reason: response.reason.unwrap_or_default(), - page_image: response.page_images.into_vec(), + page_image: response.page_images, } } } @@ -530,14 +506,12 @@ impl TryFrom for GetRelSizeRequest { } } -impl TryFrom for proto::GetRelSizeRequest { - type Error = ProtocolError; - - fn try_from(request: GetRelSizeRequest) -> Result { - Ok(Self { - read_lsn: Some(request.read_lsn.try_into()?), +impl From for proto::GetRelSizeRequest { + fn from(request: GetRelSizeRequest) -> Self { + Self { + read_lsn: Some(request.read_lsn.into()), rel: Some(request.rel.into()), - }) + } } } @@ -580,15 +554,13 @@ impl TryFrom for GetSlruSegmentRequest { } } -impl TryFrom for proto::GetSlruSegmentRequest { - type Error = ProtocolError; - - fn try_from(request: GetSlruSegmentRequest) -> Result { - Ok(Self { - read_lsn: Some(request.read_lsn.try_into()?), +impl From for proto::GetSlruSegmentRequest { + fn from(request: GetSlruSegmentRequest) -> Self { + Self { + read_lsn: Some(request.read_lsn.into()), kind: request.kind as u32, segno: request.segno, - }) + } } } @@ -605,15 +577,9 @@ impl TryFrom for GetSlruSegmentResponse { } } -impl TryFrom for proto::GetSlruSegmentResponse { - type Error = ProtocolError; - - fn try_from(segment: GetSlruSegmentResponse) -> Result { - // TODO: can a segment legitimately be empty? - if segment.is_empty() { - return Err(ProtocolError::Missing("segment")); - } - Ok(Self { segment }) +impl From for proto::GetSlruSegmentResponse { + fn from(segment: GetSlruSegmentResponse) -> Self { + Self { segment } } } diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 4dba9d267c..2a0548b811 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -371,7 +371,7 @@ where .await? .partition( self.timeline.get_shard_identity(), - Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64, + self.timeline.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64, ); let mut slru_builder = SlruSegmentsBuilder::new(&mut self.ar); diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 89f7539722..628d4f6021 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -14,7 +14,10 @@ use std::time::Duration; use anyhow::{Context, bail, ensure}; use camino::{Utf8Path, Utf8PathBuf}; use once_cell::sync::OnceCell; -use pageserver_api::config::{DiskUsageEvictionTaskConfig, MaxVectoredReadBytes, PostHogConfig}; +use pageserver_api::config::{ + DiskUsageEvictionTaskConfig, MaxGetVectoredKeys, MaxVectoredReadBytes, + PageServicePipeliningConfig, PageServicePipeliningConfigPipelined, PostHogConfig, +}; use pageserver_api::models::ImageCompressionAlgorithm; use pageserver_api::shard::TenantShardId; use pem::Pem; @@ -185,6 +188,9 @@ pub struct PageServerConf { pub max_vectored_read_bytes: MaxVectoredReadBytes, + /// Maximum number of keys to be read in a single get_vectored call. + pub max_get_vectored_keys: MaxGetVectoredKeys, + pub image_compression: ImageCompressionAlgorithm, /// Whether to offload archived timelines automatically @@ -404,6 +410,7 @@ impl PageServerConf { secondary_download_concurrency, ingest_batch_size, max_vectored_read_bytes, + max_get_vectored_keys, image_compression, timeline_offloading, ephemeral_bytes_per_memory_kb, @@ -470,6 +477,7 @@ impl PageServerConf { secondary_download_concurrency, ingest_batch_size, max_vectored_read_bytes, + max_get_vectored_keys, image_compression, timeline_offloading, ephemeral_bytes_per_memory_kb, @@ -598,6 +606,19 @@ impl PageServerConf { ) })?; + if let PageServicePipeliningConfig::Pipelined(PageServicePipeliningConfigPipelined { + max_batch_size, + .. + }) = conf.page_service_pipelining + { + if max_batch_size.get() > conf.max_get_vectored_keys.get() { + return Err(anyhow::anyhow!( + "`max_batch_size` ({max_batch_size}) must be less than or equal to `max_get_vectored_keys` ({})", + conf.max_get_vectored_keys.get() + )); + } + }; + Ok(conf) } @@ -685,6 +706,7 @@ impl ConfigurableSemaphore { mod tests { use camino::Utf8PathBuf; + use rstest::rstest; use utils::id::NodeId; use super::PageServerConf; @@ -724,4 +746,28 @@ mod tests { PageServerConf::parse_and_validate(NodeId(0), config_toml, &workdir) .expect_err("parse_and_validate should fail for endpoint without scheme"); } + + #[rstest] + #[case(32, 32, true)] + #[case(64, 32, false)] + #[case(64, 64, true)] + #[case(128, 128, true)] + fn test_config_max_batch_size_is_valid( + #[case] max_batch_size: usize, + #[case] max_get_vectored_keys: usize, + #[case] is_valid: bool, + ) { + let input = format!( + r#" + control_plane_api = "http://localhost:6666" + max_get_vectored_keys = {max_get_vectored_keys} + page_service_pipelining = {{ mode="pipelined", execution="concurrent-futures", max_batch_size={max_batch_size}, batching="uniform-lsn" }} + "#, + ); + let config_toml = toml_edit::de::from_str::(&input) + .expect("config has valid fields"); + let workdir = Utf8PathBuf::from("/nonexistent"); + let result = PageServerConf::parse_and_validate(NodeId(0), config_toml, &workdir); + assert_eq!(result.is_ok(), is_valid); + } } diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index a9b2f1b7e0..4bef9b44a7 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -15,6 +15,7 @@ use metrics::{ register_int_gauge, register_int_gauge_vec, register_uint_gauge, register_uint_gauge_vec, }; use once_cell::sync::Lazy; +use pageserver_api::config::defaults::DEFAULT_MAX_GET_VECTORED_KEYS; use pageserver_api::config::{ PageServicePipeliningConfig, PageServicePipeliningConfigPipelined, PageServiceProtocolPipelinedBatchingStrategy, PageServiceProtocolPipelinedExecutionStrategy, @@ -32,7 +33,6 @@ use crate::config::PageServerConf; use crate::context::{PageContentKind, RequestContext}; use crate::pgdatadir_mapping::DatadirModificationStats; use crate::task_mgr::TaskKind; -use crate::tenant::Timeline; use crate::tenant::layer_map::LayerMap; use crate::tenant::mgr::TenantSlot; use crate::tenant::storage_layer::{InMemoryLayer, PersistentLayerDesc}; @@ -1939,7 +1939,7 @@ static SMGR_QUERY_TIME_GLOBAL: Lazy = Lazy::new(|| { }); static PAGE_SERVICE_BATCH_SIZE_BUCKETS_GLOBAL: Lazy> = Lazy::new(|| { - (1..=u32::try_from(Timeline::MAX_GET_VECTORED_KEYS).unwrap()) + (1..=u32::try_from(DEFAULT_MAX_GET_VECTORED_KEYS).unwrap()) .map(|v| v.into()) .collect() }); @@ -1957,7 +1957,7 @@ static PAGE_SERVICE_BATCH_SIZE_BUCKETS_PER_TIMELINE: Lazy> = Lazy::new( let mut buckets = Vec::new(); for i in 0.. { let bucket = 1 << i; - if bucket > u32::try_from(Timeline::MAX_GET_VECTORED_KEYS).unwrap() { + if bucket > u32::try_from(DEFAULT_MAX_GET_VECTORED_KEYS).unwrap() { break; } buckets.push(bucket.into()); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index b9ba4a3555..77e5f0a92b 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -765,7 +765,7 @@ impl PageStreamError { request_id, status_code, reason: Some(status.message().to_string()), - page_images: SmallVec::new(), + page_images: Vec::new(), } .into()) } @@ -3521,7 +3521,7 @@ impl GrpcPageServiceHandler { request_id: req.request_id, status_code: page_api::GetPageStatusCode::Ok, reason: None, - page_images: SmallVec::with_capacity(results.len()), + page_images: Vec::with_capacity(results.len()), }; for result in results { @@ -3660,7 +3660,7 @@ impl proto::PageService for GrpcPageServiceHandler { if chunk.is_empty() { break; } - yield proto::GetBaseBackupResponseChunk::try_from(chunk.clone().freeze())?; + yield proto::GetBaseBackupResponseChunk::from(chunk.clone().freeze()); chunk.clear(); } } @@ -3806,7 +3806,7 @@ impl proto::PageService for GrpcPageServiceHandler { let resp = PageServerHandler::handle_get_slru_segment_request(&timeline, &req, &ctx).await?; let resp: page_api::GetSlruSegmentResponse = resp.segment; - Ok(tonic::Response::new(resp.try_into()?)) + Ok(tonic::Response::new(resp.into())) } } diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index b6f11b744b..633d62210d 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -431,10 +431,10 @@ impl Timeline { GetVectoredError::InvalidLsn(e) => { Err(anyhow::anyhow!("invalid LSN: {e:?}").into()) } - // NB: this should never happen in practice because we limit MAX_GET_VECTORED_KEYS + // NB: this should never happen in practice because we limit batch size to be smaller than max_get_vectored_keys // TODO: we can prevent this error class by moving this check into the type system - GetVectoredError::Oversized(err) => { - Err(anyhow::anyhow!("batching oversized: {err:?}").into()) + GetVectoredError::Oversized(err, max) => { + Err(anyhow::anyhow!("batching oversized: {err} > {max}").into()) } }; @@ -719,7 +719,7 @@ impl Timeline { let batches = keyspace.partition( self.get_shard_identity(), - Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64, + self.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64, ); let io_concurrency = IoConcurrency::spawn_from_conf( @@ -959,7 +959,7 @@ impl Timeline { let batches = keyspace.partition( self.get_shard_identity(), - Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64, + self.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64, ); let io_concurrency = IoConcurrency::spawn_from_conf( diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 308ada3fa1..f9fdc143b4 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -7197,7 +7197,7 @@ mod tests { let end = desc .key_range .start - .add(Timeline::MAX_GET_VECTORED_KEYS.try_into().unwrap()); + .add(tenant.conf.max_get_vectored_keys.get() as u32); reads.push(KeySpace { ranges: vec![start..end], }); @@ -11260,11 +11260,11 @@ mod tests { let mut keyspaces_at_lsn: HashMap = HashMap::default(); let mut used_keys: HashSet = HashSet::default(); - while used_keys.len() < Timeline::MAX_GET_VECTORED_KEYS as usize { + while used_keys.len() < tenant.conf.max_get_vectored_keys.get() { let selected_lsn = interesting_lsns.choose(&mut random).expect("not empty"); let mut selected_key = start_key.add(random.gen_range(0..KEY_DIMENSION_SIZE)); - while used_keys.len() < Timeline::MAX_GET_VECTORED_KEYS as usize { + while used_keys.len() < tenant.conf.max_get_vectored_keys.get() { if used_keys.contains(&selected_key) || selected_key >= start_key.add(KEY_DIMENSION_SIZE) { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 9ddbe404d2..8fdf84b6d3 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -817,8 +817,8 @@ pub(crate) enum GetVectoredError { #[error("timeline shutting down")] Cancelled, - #[error("requested too many keys: {0} > {}", Timeline::MAX_GET_VECTORED_KEYS)] - Oversized(u64), + #[error("requested too many keys: {0} > {1}")] + Oversized(u64, u64), #[error("requested at invalid LSN: {0}")] InvalidLsn(Lsn), @@ -1019,7 +1019,7 @@ impl From for PageReconstructError { match e { GetVectoredError::Cancelled => PageReconstructError::Cancelled, GetVectoredError::InvalidLsn(_) => PageReconstructError::Other(anyhow!("Invalid LSN")), - err @ GetVectoredError::Oversized(_) => PageReconstructError::Other(err.into()), + err @ GetVectoredError::Oversized(_, _) => PageReconstructError::Other(err.into()), GetVectoredError::MissingKey(err) => PageReconstructError::MissingKey(err), GetVectoredError::GetReadyAncestorError(err) => PageReconstructError::from(err), GetVectoredError::Other(err) => PageReconstructError::Other(err), @@ -1199,7 +1199,6 @@ impl Timeline { } } - pub(crate) const MAX_GET_VECTORED_KEYS: u64 = 32; pub(crate) const LAYERS_VISITED_WARN_THRESHOLD: u32 = 100; /// Look up multiple page versions at a given LSN @@ -1214,9 +1213,12 @@ impl Timeline { ) -> Result>, GetVectoredError> { let total_keyspace = query.total_keyspace(); - let key_count = total_keyspace.total_raw_size().try_into().unwrap(); - if key_count > Timeline::MAX_GET_VECTORED_KEYS { - return Err(GetVectoredError::Oversized(key_count)); + let key_count = total_keyspace.total_raw_size(); + if key_count > self.conf.max_get_vectored_keys.get() { + return Err(GetVectoredError::Oversized( + key_count as u64, + self.conf.max_get_vectored_keys.get() as u64, + )); } for range in &total_keyspace.ranges { @@ -5270,7 +5272,7 @@ impl Timeline { key = key.next(); // Maybe flush `key_rest_accum` - if key_request_accum.raw_size() >= Timeline::MAX_GET_VECTORED_KEYS + if key_request_accum.raw_size() >= self.conf.max_get_vectored_keys.get() as u64 || (last_key_in_range && key_request_accum.raw_size() > 0) { let query =