diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index b2662c562a..1194ee93ef 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -104,7 +104,9 @@ pub struct ConfigToml { pub image_compression: ImageCompressionAlgorithm, pub ephemeral_bytes_per_memory_kb: usize, pub l0_flush: Option, - pub compact_level0_phase1_value_access: CompactL0Phase1ValueAccess, + #[serde(skip_serializing)] + // TODO(https://github.com/neondatabase/neon/issues/8184): remove after this field is removed from all pageserver.toml's + pub compact_level0_phase1_value_access: serde::de::IgnoredAny, pub virtual_file_direct_io: crate::models::virtual_file::DirectIoMode, pub io_buffer_alignment: usize, } @@ -209,43 +211,6 @@ pub enum GetImpl { #[serde(transparent)] pub struct MaxVectoredReadBytes(pub NonZeroUsize); -#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)] -#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)] -pub enum CompactL0Phase1ValueAccess { - /// The old way. - PageCachedBlobIo, - /// The new way. - StreamingKmerge { - /// If set, we run both the old way and the new way, validate that - /// they are identical (=> [`CompactL0BypassPageCacheValidation`]), - /// and if the validation fails, - /// - in tests: fail them with a panic or - /// - in prod, log a rate-limited warning and use the old way's results. - /// - /// If not set, we only run the new way and trust its results. - validate: Option, - }, -} - -/// See [`CompactL0Phase1ValueAccess::StreamingKmerge`]. -#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)] -#[serde(rename_all = "kebab-case")] -pub enum CompactL0BypassPageCacheValidation { - /// Validate that the series of (key, lsn) pairs are the same. - KeyLsn, - /// Validate that the entire output of old and new way is identical. - KeyLsnValue, -} - -impl Default for CompactL0Phase1ValueAccess { - fn default() -> Self { - CompactL0Phase1ValueAccess::StreamingKmerge { - // TODO(https://github.com/neondatabase/neon/issues/8184): change to None once confident - validate: Some(CompactL0BypassPageCacheValidation::KeyLsnValue), - } - } -} - /// A tenant's calcuated configuration, which is the result of merging a /// tenant's TenantConfOpt with the global TenantConf from PageServerConf. /// @@ -452,7 +417,7 @@ impl Default for ConfigToml { image_compression: (DEFAULT_IMAGE_COMPRESSION), ephemeral_bytes_per_memory_kb: (DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB), l0_flush: None, - compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(), + compact_level0_phase1_value_access: Default::default(), virtual_file_direct_io: crate::models::virtual_file::DirectIoMode::default(), io_buffer_alignment: DEFAULT_IO_BUFFER_ALIGNMENT, diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 2c60e8d7d1..59194ab4bd 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -125,7 +125,6 @@ fn main() -> anyhow::Result<()> { // after setting up logging, log the effective IO engine choice and read path implementations info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine"); info!(?conf.virtual_file_direct_io, "starting with virtual_file Direct IO settings"); - info!(?conf.compact_level0_phase1_value_access, "starting with setting for compact_level0_phase1_value_access"); info!(?conf.io_buffer_alignment, "starting with setting for IO buffer alignment"); // The tenants directory contains all the pageserver local disk state. diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index c159b66905..4e68e276d3 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -174,10 +174,6 @@ pub struct PageServerConf { pub l0_flush: crate::l0_flush::L0FlushConfig, - /// This flag is temporary and will be removed after gradual rollout. - /// See . - pub compact_level0_phase1_value_access: pageserver_api::config::CompactL0Phase1ValueAccess, - /// Direct IO settings pub virtual_file_direct_io: virtual_file::DirectIoMode, @@ -338,7 +334,7 @@ impl PageServerConf { max_vectored_read_bytes, image_compression, ephemeral_bytes_per_memory_kb, - compact_level0_phase1_value_access, + compact_level0_phase1_value_access: _, l0_flush, virtual_file_direct_io, concurrent_tenant_warmup, @@ -383,7 +379,6 @@ impl PageServerConf { max_vectored_read_bytes, image_compression, ephemeral_bytes_per_memory_kb, - compact_level0_phase1_value_access, virtual_file_direct_io, io_buffer_alignment, @@ -561,6 +556,16 @@ mod tests { .expect("parse_and_validate"); } + #[test] + fn test_compactl0_phase1_access_mode_is_ignored_silently() { + let input = indoc::indoc! {r#" + [compact_level0_phase1_value_access] + mode = "streaming-kmerge" + validate = "key-lsn-value" + "#}; + toml_edit::de::from_str::(input).unwrap(); + } + /// If there's a typo in the pageserver config, we'd rather catch that typo /// and fail pageserver startup than silently ignoring the typo, leaving whoever /// made it in the believe that their config change is effective. @@ -637,14 +642,5 @@ mod tests { // some_invalid_field = 23 // "#} // ); - - test!( - compact_level0_phase1_value_access, - indoc! {r#" - [compact_level0_phase1_value_access] - mode = "streaming-kmerge" - some_invalid_field = 23 - "#} - ); } } diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 6b9c8386f7..a87b502cd6 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -19,7 +19,6 @@ use bytes::Bytes; use enumset::EnumSet; use fail::fail_point; use itertools::Itertools; -use pageserver_api::config::{CompactL0BypassPageCacheValidation, CompactL0Phase1ValueAccess}; use pageserver_api::key::KEY_SIZE; use pageserver_api::keyspace::ShardedRange; use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId}; @@ -912,137 +911,13 @@ impl Timeline { // we're compacting, in key, LSN order. // If there's both a Value::Image and Value::WalRecord for the same (key,lsn), // then the Value::Image is ordered before Value::WalRecord. - // - // TODO(https://github.com/neondatabase/neon/issues/8184): remove the page cached blob_io - // option and validation code once we've reached confidence. - enum AllValuesIter<'a> { - PageCachedBlobIo { - all_keys_iter: VecIter<'a>, - }, - StreamingKmergeBypassingPageCache { - merge_iter: MergeIterator<'a>, - }, - ValidatingStreamingKmergeBypassingPageCache { - mode: CompactL0BypassPageCacheValidation, - merge_iter: MergeIterator<'a>, - all_keys_iter: VecIter<'a>, - }, - } - type VecIter<'a> = std::slice::Iter<'a, DeltaEntry<'a>>; // TODO: distinguished lifetimes - impl AllValuesIter<'_> { - async fn next_all_keys_iter( - iter: &mut VecIter<'_>, - ctx: &RequestContext, - ) -> anyhow::Result> { - let Some(DeltaEntry { - key, - lsn, - val: value_ref, - .. - }) = iter.next() - else { - return Ok(None); - }; - let value = value_ref.load(ctx).await?; - Ok(Some((*key, *lsn, value))) - } - async fn next( - &mut self, - ctx: &RequestContext, - ) -> anyhow::Result> { - match self { - AllValuesIter::PageCachedBlobIo { all_keys_iter: iter } => { - Self::next_all_keys_iter(iter, ctx).await - } - AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter } => merge_iter.next().await, - AllValuesIter::ValidatingStreamingKmergeBypassingPageCache { mode, merge_iter, all_keys_iter } => async { - // advance both iterators - let all_keys_iter_item = Self::next_all_keys_iter(all_keys_iter, ctx).await; - let merge_iter_item = merge_iter.next().await; - // compare results & log warnings as needed - macro_rules! rate_limited_warn { - ($($arg:tt)*) => {{ - if cfg!(debug_assertions) || cfg!(feature = "testing") { - warn!($($arg)*); - panic!("CompactL0BypassPageCacheValidation failure, check logs"); - } - use once_cell::sync::Lazy; - use utils::rate_limit::RateLimit; - use std::sync::Mutex; - use std::time::Duration; - static LOGGED: Lazy> = - Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10)))); - let mut rate_limit = LOGGED.lock().unwrap(); - rate_limit.call(|| { - warn!($($arg)*); - }); - }} - } - match (&all_keys_iter_item, &merge_iter_item) { - (Err(_), Err(_)) => { - // don't bother asserting equivality of the errors - } - (Err(all_keys), Ok(merge)) => { - rate_limited_warn!(?merge, "all_keys_iter returned an error where merge did not: {all_keys:?}"); - }, - (Ok(all_keys), Err(merge)) => { - rate_limited_warn!(?all_keys, "merge returned an error where all_keys_iter did not: {merge:?}"); - }, - (Ok(None), Ok(None)) => { } - (Ok(Some(all_keys)), Ok(None)) => { - rate_limited_warn!(?all_keys, "merge returned None where all_keys_iter returned Some"); - } - (Ok(None), Ok(Some(merge))) => { - rate_limited_warn!(?merge, "all_keys_iter returned None where merge returned Some"); - } - (Ok(Some((all_keys_key, all_keys_lsn, all_keys_value))), Ok(Some((merge_key, merge_lsn, merge_value)))) => { - match mode { - // TODO: in this mode, we still load the value from disk for both iterators, even though we only need the all_keys_iter one - CompactL0BypassPageCacheValidation::KeyLsn => { - let all_keys = (all_keys_key, all_keys_lsn); - let merge = (merge_key, merge_lsn); - if all_keys != merge { - rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN) than all_keys_iter"); - } - } - CompactL0BypassPageCacheValidation::KeyLsnValue => { - let all_keys = (all_keys_key, all_keys_lsn, all_keys_value); - let merge = (merge_key, merge_lsn, merge_value); - if all_keys != merge { - rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN,Value) than all_keys_iter"); - } - } - } - } - } - // in case of mismatch, trust the legacy all_keys_iter_item - all_keys_iter_item - }.instrument(info_span!("next")).await - } - } - } - let mut all_values_iter = match &self.conf.compact_level0_phase1_value_access { - CompactL0Phase1ValueAccess::PageCachedBlobIo => AllValuesIter::PageCachedBlobIo { - all_keys_iter: all_keys.iter(), - }, - CompactL0Phase1ValueAccess::StreamingKmerge { validate } => { - let merge_iter = { - let mut deltas = Vec::with_capacity(deltas_to_compact.len()); - for l in deltas_to_compact.iter() { - let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?; - deltas.push(l); - } - MergeIterator::create(&deltas, &[], ctx) - }; - match validate { - None => AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter }, - Some(validate) => AllValuesIter::ValidatingStreamingKmergeBypassingPageCache { - mode: validate.clone(), - merge_iter, - all_keys_iter: all_keys.iter(), - }, - } + let mut all_values_iter = { + let mut deltas = Vec::with_capacity(deltas_to_compact.len()); + for l in deltas_to_compact.iter() { + let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?; + deltas.push(l); } + MergeIterator::create(&deltas, &[], ctx) }; // This iterator walks through all keys and is needed to calculate size used by each key @@ -1119,7 +994,7 @@ impl Timeline { let mut keys = 0; while let Some((key, lsn, value)) = all_values_iter - .next(ctx) + .next() .await .map_err(CompactionError::Other)? {