diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index 444983bd18..7ca623f8e5 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -181,6 +181,7 @@ pub struct ConfigToml { pub virtual_file_io_engine: Option, pub ingest_batch_size: u64, pub max_vectored_read_bytes: MaxVectoredReadBytes, + pub max_get_vectored_keys: MaxGetVectoredKeys, pub image_compression: ImageCompressionAlgorithm, pub timeline_offloading: bool, pub ephemeral_bytes_per_memory_kb: usize, @@ -229,7 +230,7 @@ pub enum PageServicePipeliningConfig { } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct PageServicePipeliningConfigPipelined { - /// Causes runtime errors if larger than max get_vectored batch size. + /// Failed config parsing and validation if larger than `max_get_vectored_keys`. pub max_batch_size: NonZeroUsize, pub execution: PageServiceProtocolPipelinedExecutionStrategy, // The default below is such that new versions of the software can start @@ -403,6 +404,16 @@ impl Default for EvictionOrder { #[serde(transparent)] pub struct MaxVectoredReadBytes(pub NonZeroUsize); +#[derive(Copy, Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[serde(transparent)] +pub struct MaxGetVectoredKeys(NonZeroUsize); + +impl MaxGetVectoredKeys { + pub fn get(&self) -> usize { + self.0.get() + } +} + /// Tenant-level configuration values, used for various purposes. #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] #[serde(default)] @@ -587,6 +598,8 @@ pub mod defaults { /// That is, slightly above 128 kB. pub const DEFAULT_MAX_VECTORED_READ_BYTES: usize = 130 * 1024; // 130 KiB + pub const DEFAULT_MAX_GET_VECTORED_KEYS: usize = 32; + pub const DEFAULT_IMAGE_COMPRESSION: ImageCompressionAlgorithm = ImageCompressionAlgorithm::Zstd { level: Some(1) }; @@ -685,6 +698,9 @@ impl Default for ConfigToml { max_vectored_read_bytes: (MaxVectoredReadBytes( NonZeroUsize::new(DEFAULT_MAX_VECTORED_READ_BYTES).unwrap(), )), + max_get_vectored_keys: (MaxGetVectoredKeys( + NonZeroUsize::new(DEFAULT_MAX_GET_VECTORED_KEYS).unwrap(), + )), image_compression: (DEFAULT_IMAGE_COMPRESSION), timeline_offloading: true, ephemeral_bytes_per_memory_kb: (DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB), diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 4dba9d267c..2a0548b811 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -371,7 +371,7 @@ where .await? .partition( self.timeline.get_shard_identity(), - Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64, + self.timeline.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64, ); let mut slru_builder = SlruSegmentsBuilder::new(&mut self.ar); diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 89f7539722..628d4f6021 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -14,7 +14,10 @@ use std::time::Duration; use anyhow::{Context, bail, ensure}; use camino::{Utf8Path, Utf8PathBuf}; use once_cell::sync::OnceCell; -use pageserver_api::config::{DiskUsageEvictionTaskConfig, MaxVectoredReadBytes, PostHogConfig}; +use pageserver_api::config::{ + DiskUsageEvictionTaskConfig, MaxGetVectoredKeys, MaxVectoredReadBytes, + PageServicePipeliningConfig, PageServicePipeliningConfigPipelined, PostHogConfig, +}; use pageserver_api::models::ImageCompressionAlgorithm; use pageserver_api::shard::TenantShardId; use pem::Pem; @@ -185,6 +188,9 @@ pub struct PageServerConf { pub max_vectored_read_bytes: MaxVectoredReadBytes, + /// Maximum number of keys to be read in a single get_vectored call. + pub max_get_vectored_keys: MaxGetVectoredKeys, + pub image_compression: ImageCompressionAlgorithm, /// Whether to offload archived timelines automatically @@ -404,6 +410,7 @@ impl PageServerConf { secondary_download_concurrency, ingest_batch_size, max_vectored_read_bytes, + max_get_vectored_keys, image_compression, timeline_offloading, ephemeral_bytes_per_memory_kb, @@ -470,6 +477,7 @@ impl PageServerConf { secondary_download_concurrency, ingest_batch_size, max_vectored_read_bytes, + max_get_vectored_keys, image_compression, timeline_offloading, ephemeral_bytes_per_memory_kb, @@ -598,6 +606,19 @@ impl PageServerConf { ) })?; + if let PageServicePipeliningConfig::Pipelined(PageServicePipeliningConfigPipelined { + max_batch_size, + .. + }) = conf.page_service_pipelining + { + if max_batch_size.get() > conf.max_get_vectored_keys.get() { + return Err(anyhow::anyhow!( + "`max_batch_size` ({max_batch_size}) must be less than or equal to `max_get_vectored_keys` ({})", + conf.max_get_vectored_keys.get() + )); + } + }; + Ok(conf) } @@ -685,6 +706,7 @@ impl ConfigurableSemaphore { mod tests { use camino::Utf8PathBuf; + use rstest::rstest; use utils::id::NodeId; use super::PageServerConf; @@ -724,4 +746,28 @@ mod tests { PageServerConf::parse_and_validate(NodeId(0), config_toml, &workdir) .expect_err("parse_and_validate should fail for endpoint without scheme"); } + + #[rstest] + #[case(32, 32, true)] + #[case(64, 32, false)] + #[case(64, 64, true)] + #[case(128, 128, true)] + fn test_config_max_batch_size_is_valid( + #[case] max_batch_size: usize, + #[case] max_get_vectored_keys: usize, + #[case] is_valid: bool, + ) { + let input = format!( + r#" + control_plane_api = "http://localhost:6666" + max_get_vectored_keys = {max_get_vectored_keys} + page_service_pipelining = {{ mode="pipelined", execution="concurrent-futures", max_batch_size={max_batch_size}, batching="uniform-lsn" }} + "#, + ); + let config_toml = toml_edit::de::from_str::(&input) + .expect("config has valid fields"); + let workdir = Utf8PathBuf::from("/nonexistent"); + let result = PageServerConf::parse_and_validate(NodeId(0), config_toml, &workdir); + assert_eq!(result.is_ok(), is_valid); + } } diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index a9b2f1b7e0..4bef9b44a7 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -15,6 +15,7 @@ use metrics::{ register_int_gauge, register_int_gauge_vec, register_uint_gauge, register_uint_gauge_vec, }; use once_cell::sync::Lazy; +use pageserver_api::config::defaults::DEFAULT_MAX_GET_VECTORED_KEYS; use pageserver_api::config::{ PageServicePipeliningConfig, PageServicePipeliningConfigPipelined, PageServiceProtocolPipelinedBatchingStrategy, PageServiceProtocolPipelinedExecutionStrategy, @@ -32,7 +33,6 @@ use crate::config::PageServerConf; use crate::context::{PageContentKind, RequestContext}; use crate::pgdatadir_mapping::DatadirModificationStats; use crate::task_mgr::TaskKind; -use crate::tenant::Timeline; use crate::tenant::layer_map::LayerMap; use crate::tenant::mgr::TenantSlot; use crate::tenant::storage_layer::{InMemoryLayer, PersistentLayerDesc}; @@ -1939,7 +1939,7 @@ static SMGR_QUERY_TIME_GLOBAL: Lazy = Lazy::new(|| { }); static PAGE_SERVICE_BATCH_SIZE_BUCKETS_GLOBAL: Lazy> = Lazy::new(|| { - (1..=u32::try_from(Timeline::MAX_GET_VECTORED_KEYS).unwrap()) + (1..=u32::try_from(DEFAULT_MAX_GET_VECTORED_KEYS).unwrap()) .map(|v| v.into()) .collect() }); @@ -1957,7 +1957,7 @@ static PAGE_SERVICE_BATCH_SIZE_BUCKETS_PER_TIMELINE: Lazy> = Lazy::new( let mut buckets = Vec::new(); for i in 0.. { let bucket = 1 << i; - if bucket > u32::try_from(Timeline::MAX_GET_VECTORED_KEYS).unwrap() { + if bucket > u32::try_from(DEFAULT_MAX_GET_VECTORED_KEYS).unwrap() { break; } buckets.push(bucket.into()); diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index b6f11b744b..633d62210d 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -431,10 +431,10 @@ impl Timeline { GetVectoredError::InvalidLsn(e) => { Err(anyhow::anyhow!("invalid LSN: {e:?}").into()) } - // NB: this should never happen in practice because we limit MAX_GET_VECTORED_KEYS + // NB: this should never happen in practice because we limit batch size to be smaller than max_get_vectored_keys // TODO: we can prevent this error class by moving this check into the type system - GetVectoredError::Oversized(err) => { - Err(anyhow::anyhow!("batching oversized: {err:?}").into()) + GetVectoredError::Oversized(err, max) => { + Err(anyhow::anyhow!("batching oversized: {err} > {max}").into()) } }; @@ -719,7 +719,7 @@ impl Timeline { let batches = keyspace.partition( self.get_shard_identity(), - Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64, + self.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64, ); let io_concurrency = IoConcurrency::spawn_from_conf( @@ -959,7 +959,7 @@ impl Timeline { let batches = keyspace.partition( self.get_shard_identity(), - Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64, + self.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64, ); let io_concurrency = IoConcurrency::spawn_from_conf( diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 308ada3fa1..f9fdc143b4 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -7197,7 +7197,7 @@ mod tests { let end = desc .key_range .start - .add(Timeline::MAX_GET_VECTORED_KEYS.try_into().unwrap()); + .add(tenant.conf.max_get_vectored_keys.get() as u32); reads.push(KeySpace { ranges: vec![start..end], }); @@ -11260,11 +11260,11 @@ mod tests { let mut keyspaces_at_lsn: HashMap = HashMap::default(); let mut used_keys: HashSet = HashSet::default(); - while used_keys.len() < Timeline::MAX_GET_VECTORED_KEYS as usize { + while used_keys.len() < tenant.conf.max_get_vectored_keys.get() { let selected_lsn = interesting_lsns.choose(&mut random).expect("not empty"); let mut selected_key = start_key.add(random.gen_range(0..KEY_DIMENSION_SIZE)); - while used_keys.len() < Timeline::MAX_GET_VECTORED_KEYS as usize { + while used_keys.len() < tenant.conf.max_get_vectored_keys.get() { if used_keys.contains(&selected_key) || selected_key >= start_key.add(KEY_DIMENSION_SIZE) { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 9ddbe404d2..8fdf84b6d3 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -817,8 +817,8 @@ pub(crate) enum GetVectoredError { #[error("timeline shutting down")] Cancelled, - #[error("requested too many keys: {0} > {}", Timeline::MAX_GET_VECTORED_KEYS)] - Oversized(u64), + #[error("requested too many keys: {0} > {1}")] + Oversized(u64, u64), #[error("requested at invalid LSN: {0}")] InvalidLsn(Lsn), @@ -1019,7 +1019,7 @@ impl From for PageReconstructError { match e { GetVectoredError::Cancelled => PageReconstructError::Cancelled, GetVectoredError::InvalidLsn(_) => PageReconstructError::Other(anyhow!("Invalid LSN")), - err @ GetVectoredError::Oversized(_) => PageReconstructError::Other(err.into()), + err @ GetVectoredError::Oversized(_, _) => PageReconstructError::Other(err.into()), GetVectoredError::MissingKey(err) => PageReconstructError::MissingKey(err), GetVectoredError::GetReadyAncestorError(err) => PageReconstructError::from(err), GetVectoredError::Other(err) => PageReconstructError::Other(err), @@ -1199,7 +1199,6 @@ impl Timeline { } } - pub(crate) const MAX_GET_VECTORED_KEYS: u64 = 32; pub(crate) const LAYERS_VISITED_WARN_THRESHOLD: u32 = 100; /// Look up multiple page versions at a given LSN @@ -1214,9 +1213,12 @@ impl Timeline { ) -> Result>, GetVectoredError> { let total_keyspace = query.total_keyspace(); - let key_count = total_keyspace.total_raw_size().try_into().unwrap(); - if key_count > Timeline::MAX_GET_VECTORED_KEYS { - return Err(GetVectoredError::Oversized(key_count)); + let key_count = total_keyspace.total_raw_size(); + if key_count > self.conf.max_get_vectored_keys.get() { + return Err(GetVectoredError::Oversized( + key_count as u64, + self.conf.max_get_vectored_keys.get() as u64, + )); } for range in &total_keyspace.ranges { @@ -5270,7 +5272,7 @@ impl Timeline { key = key.next(); // Maybe flush `key_rest_accum` - if key_request_accum.raw_size() >= Timeline::MAX_GET_VECTORED_KEYS + if key_request_accum.raw_size() >= self.conf.max_get_vectored_keys.get() as u64 || (last_key_in_range && key_request_accum.raw_size() > 0) { let query =