From 017c34b7736119f250c68c8f2aecfdee2866dc5f Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Tue, 7 May 2024 12:30:18 -0400 Subject: [PATCH] feat(pageserver): generate basebackup from aux file v2 storage (#7517) This pull request adds the new basebackup read path + aux file write path. In the regression test, all logical replication tests are run with matrix aux_file_v2=false/true. Also fixed the vectored get code path to correctly return missing key error when being called from the unified sequential get code path. --------- Signed-off-by: Alex Chi Z --- control_plane/src/pageserver.rs | 19 +- libs/pageserver_api/src/models.rs | 27 +- pageserver/src/aux_file.rs | 96 +++++++ pageserver/src/pgdatadir_mapping.rs | 241 ++++++++++++------ pageserver/src/tenant.rs | 2 +- pageserver/src/tenant/config.rs | 17 +- pageserver/src/tenant/timeline.rs | 24 +- test_runner/fixtures/neon_fixtures.py | 21 ++ test_runner/fixtures/parametrize.py | 6 + test_runner/fixtures/utils.py | 14 + .../regress/test_attach_tenant_config.py | 2 +- .../regress/test_logical_replication.py | 26 ++ 12 files changed, 391 insertions(+), 104 deletions(-) diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index fbe0d419ae..2179859023 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -17,7 +17,8 @@ use anyhow::{bail, Context}; use camino::Utf8PathBuf; use futures::SinkExt; use pageserver_api::models::{ - self, LocationConfig, ShardParameters, TenantHistorySize, TenantInfo, TimelineInfo, + self, AuxFilePolicy, LocationConfig, ShardParameters, TenantHistorySize, TenantInfo, + TimelineInfo, }; use pageserver_api::shard::TenantShardId; use pageserver_client::mgmt_api; @@ -429,11 +430,11 @@ impl PageServerNode { .map(serde_json::from_str) .transpose() .context("parse `timeline_get_throttle` from json")?, - switch_to_aux_file_v2: settings - .remove("switch_to_aux_file_v2") - .map(|x| x.parse::()) + switch_aux_file_policy: settings + .remove("switch_aux_file_policy") + .map(|x| x.parse::()) .transpose() - .context("Failed to parse 'switch_to_aux_file_v2' as bool")?, + .context("Failed to parse 'switch_aux_file_policy'")?, }; if !settings.is_empty() { bail!("Unrecognized tenant settings: {settings:?}") @@ -552,11 +553,11 @@ impl PageServerNode { .map(serde_json::from_str) .transpose() .context("parse `timeline_get_throttle` from json")?, - switch_to_aux_file_v2: settings - .remove("switch_to_aux_file_v2") - .map(|x| x.parse::()) + switch_aux_file_policy: settings + .remove("switch_aux_file_policy") + .map(|x| x.parse::()) .transpose() - .context("Failed to parse 'switch_to_aux_file_v2' as bool")?, + .context("Failed to parse 'switch_aux_file_policy'")?, } }; diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 37d968cebd..1df5820fb9 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -9,6 +9,7 @@ use std::{ collections::HashMap, io::{BufRead, Read}, num::{NonZeroU64, NonZeroUsize}, + str::FromStr, time::{Duration, SystemTime}, }; @@ -304,7 +305,31 @@ pub struct TenantConfig { pub lazy_slru_download: Option, pub timeline_get_throttle: Option, pub image_layer_creation_check_threshold: Option, - pub switch_to_aux_file_v2: Option, + pub switch_aux_file_policy: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum AuxFilePolicy { + V1, + V2, + CrossValidation, +} + +impl FromStr for AuxFilePolicy { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + let s = s.to_lowercase(); + if s == "v1" { + Ok(Self::V1) + } else if s == "v2" { + Ok(Self::V2) + } else if s == "crossvalidation" || s == "cross_validation" { + Ok(Self::CrossValidation) + } else { + anyhow::bail!("cannot parse {} to aux file policy", s) + } + } } #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] diff --git a/pageserver/src/aux_file.rs b/pageserver/src/aux_file.rs index a343acaf7a..a26ed84a0d 100644 --- a/pageserver/src/aux_file.rs +++ b/pageserver/src/aux_file.rs @@ -1,3 +1,4 @@ +use bytes::{Buf, BufMut, Bytes}; use pageserver_api::key::{Key, AUX_KEY_PREFIX, METADATA_KEY_SIZE}; use tracing::warn; @@ -61,6 +62,84 @@ pub fn encode_aux_file_key(path: &str) -> Key { } } +const AUX_FILE_ENCODING_VERSION: u8 = 0x01; + +pub fn decode_file_value(val: &[u8]) -> anyhow::Result> { + let mut ptr = val; + if ptr.is_empty() { + // empty value = no files + return Ok(Vec::new()); + } + assert_eq!( + ptr.get_u8(), + AUX_FILE_ENCODING_VERSION, + "unsupported aux file value" + ); + let mut files = vec![]; + while ptr.has_remaining() { + let key_len = ptr.get_u32() as usize; + let key = &ptr[..key_len]; + ptr.advance(key_len); + let val_len = ptr.get_u32() as usize; + let content = &ptr[..val_len]; + ptr.advance(val_len); + + let path = std::str::from_utf8(key)?; + files.push((path, content)); + } + Ok(files) +} + +/// Decode an aux file key-value pair into a list of files. The returned `Bytes` contains reference +/// to the original value slice. Be cautious about memory consumption. +pub fn decode_file_value_bytes(val: &Bytes) -> anyhow::Result> { + let mut ptr = val.clone(); + if ptr.is_empty() { + // empty value = no files + return Ok(Vec::new()); + } + assert_eq!( + ptr.get_u8(), + AUX_FILE_ENCODING_VERSION, + "unsupported aux file value" + ); + let mut files = vec![]; + while ptr.has_remaining() { + let key_len = ptr.get_u32() as usize; + let key = ptr.slice(..key_len); + ptr.advance(key_len); + let val_len = ptr.get_u32() as usize; + let content = ptr.slice(..val_len); + ptr.advance(val_len); + + let path = std::str::from_utf8(&key)?.to_string(); + files.push((path, content)); + } + Ok(files) +} + +pub fn encode_file_value(files: &[(&str, &[u8])]) -> anyhow::Result> { + if files.is_empty() { + // no files = empty value + return Ok(Vec::new()); + } + let mut encoded = vec![]; + encoded.put_u8(AUX_FILE_ENCODING_VERSION); + for (path, content) in files { + if path.len() > u32::MAX as usize { + anyhow::bail!("{} exceeds path size limit", path); + } + encoded.put_u32(path.len() as u32); + encoded.put_slice(path.as_bytes()); + if content.len() > u32::MAX as usize { + anyhow::bail!("{} exceeds content size limit", path); + } + encoded.put_u32(content.len() as u32); + encoded.put_slice(content); + } + Ok(encoded) +} + #[cfg(test)] mod tests { use super::*; @@ -109,4 +188,21 @@ mod tests { encode_aux_file_key("other_file_not_supported").to_string() ); } + + #[test] + fn test_value_encoding() { + let files = vec![ + ("pg_logical/1.file", "1111".as_bytes()), + ("pg_logical/2.file", "2222".as_bytes()), + ]; + assert_eq!( + files, + decode_file_value(&encode_file_value(&files).unwrap()).unwrap() + ); + let files = vec![]; + assert_eq!( + files, + decode_file_value(&encode_file_value(&files).unwrap()).unwrap() + ); + } } diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 12314c5961..a4215ee107 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -10,9 +10,9 @@ use super::tenant::{PageReconstructError, Timeline}; use crate::context::RequestContext; use crate::keyspace::{KeySpace, KeySpaceAccum}; use crate::metrics::WAL_INGEST; -use crate::repository::*; use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id; use crate::walrecord::NeonWalRecord; +use crate::{aux_file, repository::*}; use anyhow::{ensure, Context}; use bytes::{Buf, Bytes, BytesMut}; use enum_map::Enum; @@ -24,6 +24,7 @@ use pageserver_api::key::{ AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY, }; use pageserver_api::keyspace::SparseKeySpace; +use pageserver_api::models::AuxFilePolicy; use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind}; use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; use postgres_ffi::BLCKSZ; @@ -670,7 +671,7 @@ impl Timeline { self.get(CHECKPOINT_KEY, lsn, ctx).await } - pub(crate) async fn list_aux_files( + async fn list_aux_files_v1( &self, lsn: Lsn, ctx: &RequestContext, @@ -688,6 +689,63 @@ impl Timeline { } } + async fn list_aux_files_v2( + &self, + lsn: Lsn, + ctx: &RequestContext, + ) -> Result, PageReconstructError> { + let kv = self + .scan(KeySpace::single(Key::metadata_aux_key_range()), lsn, ctx) + .await + .context("scan")?; + let mut result = HashMap::new(); + for (_, v) in kv { + let v = v.context("get value")?; + let v = aux_file::decode_file_value_bytes(&v).context("value decode")?; + for (fname, content) in v { + result.insert(fname, content); + } + } + Ok(result) + } + + pub(crate) async fn list_aux_files( + &self, + lsn: Lsn, + ctx: &RequestContext, + ) -> Result, PageReconstructError> { + match self.get_switch_aux_file_policy() { + AuxFilePolicy::V1 => self.list_aux_files_v1(lsn, ctx).await, + AuxFilePolicy::V2 => self.list_aux_files_v2(lsn, ctx).await, + AuxFilePolicy::CrossValidation => { + let v1_result = self.list_aux_files_v1(lsn, ctx).await; + let v2_result = self.list_aux_files_v2(lsn, ctx).await; + match (v1_result, v2_result) { + (Ok(v1), Ok(v2)) => { + if v1 != v2 { + tracing::error!( + "unmatched aux file v1 v2 result:\nv1 {v1:?}\nv2 {v2:?}" + ); + return Err(PageReconstructError::Other(anyhow::anyhow!( + "unmatched aux file v1 v2 result" + ))); + } + Ok(v1) + } + (Ok(_), Err(v2)) => { + tracing::error!("aux file v1 returns Ok while aux file v2 returns an err"); + Err(v2) + } + (Err(v1), Ok(_)) => { + tracing::error!("aux file v2 returns Ok while aux file v1 returns an err"); + Err(v1) + } + (Err(_), Err(v2)) => Err(v2), + } + } + } + } + /// Does the same as get_current_logical_size but counted on demand. /// Used to initialize the logical size tracking on startup. /// @@ -1389,6 +1447,9 @@ impl<'a> DatadirModification<'a> { } pub fn init_aux_dir(&mut self) -> anyhow::Result<()> { + if let AuxFilePolicy::V2 = self.tline.get_switch_aux_file_policy() { + return Ok(()); + } let buf = AuxFilesDirectory::ser(&AuxFilesDirectory { files: HashMap::new(), })?; @@ -1404,89 +1465,121 @@ impl<'a> DatadirModification<'a> { content: &[u8], ctx: &RequestContext, ) -> anyhow::Result<()> { - let file_path = path.to_string(); - let content = if content.is_empty() { - None - } else { - Some(Bytes::copy_from_slice(content)) - }; - - let n_files; - let mut aux_files = self.tline.aux_files.lock().await; - if let Some(mut dir) = aux_files.dir.take() { - // We already updated aux files in `self`: emit a delta and update our latest value. - dir.upsert(file_path.clone(), content.clone()); - n_files = dir.files.len(); - if aux_files.n_deltas == MAX_AUX_FILE_DELTAS { - self.put( - AUX_FILES_KEY, - Value::Image(Bytes::from( - AuxFilesDirectory::ser(&dir).context("serialize")?, - )), - ); - aux_files.n_deltas = 0; + let policy = self.tline.get_switch_aux_file_policy(); + if let AuxFilePolicy::V2 | AuxFilePolicy::CrossValidation = policy { + let key = aux_file::encode_aux_file_key(path); + // retrieve the key from the engine + let old_val = match self.get(key, ctx).await { + Ok(val) => Some(val), + Err(PageReconstructError::MissingKey(_)) => None, + Err(e) => return Err(e.into()), + }; + let files = if let Some(ref old_val) = old_val { + aux_file::decode_file_value(old_val)? } else { - self.put( - AUX_FILES_KEY, - Value::WalRecord(NeonWalRecord::AuxFile { file_path, content }), - ); - aux_files.n_deltas += 1; - } - aux_files.dir = Some(dir); - } else { - // Check if the AUX_FILES_KEY is initialized - match self.get(AUX_FILES_KEY, ctx).await { - Ok(dir_bytes) => { - let mut dir = AuxFilesDirectory::des(&dir_bytes)?; - // Key is already set, we may append a delta - self.put( - AUX_FILES_KEY, - Value::WalRecord(NeonWalRecord::AuxFile { - file_path: file_path.clone(), - content: content.clone(), - }), - ); - dir.upsert(file_path, content); - n_files = dir.files.len(); - aux_files.dir = Some(dir); - } - Err( - e @ (PageReconstructError::AncestorStopping(_) - | PageReconstructError::Cancelled - | PageReconstructError::AncestorLsnTimeout(_)), - ) => { - // Important that we do not interpret a shutdown error as "not found" and thereby - // reset the map. - return Err(e.into()); - } - // Note: we added missing key error variant in https://github.com/neondatabase/neon/pull/7393 but - // the original code assumes all other errors are missing keys. Therefore, we keep the code path - // the same for now, though in theory, we should only match the `MissingKey` variant. - Err( - PageReconstructError::Other(_) - | PageReconstructError::WalRedo(_) - | PageReconstructError::MissingKey { .. }, - ) => { - // Key is missing, we must insert an image as the basis for subsequent deltas. + Vec::new() + }; + let new_files = if content.is_empty() { + files + .into_iter() + .filter(|(p, _)| &path != p) + .collect::>() + } else { + files + .into_iter() + .filter(|(p, _)| &path != p) + .chain(std::iter::once((path, content))) + .collect::>() + }; + let new_val = aux_file::encode_file_value(&new_files)?; + self.put(key, Value::Image(new_val.into())); + } - let mut dir = AuxFilesDirectory { - files: HashMap::new(), - }; - dir.upsert(file_path, content); + if let AuxFilePolicy::V1 | AuxFilePolicy::CrossValidation = policy { + let file_path = path.to_string(); + let content = if content.is_empty() { + None + } else { + Some(Bytes::copy_from_slice(content)) + }; + + let n_files; + let mut aux_files = self.tline.aux_files.lock().await; + if let Some(mut dir) = aux_files.dir.take() { + // We already updated aux files in `self`: emit a delta and update our latest value. + dir.upsert(file_path.clone(), content.clone()); + n_files = dir.files.len(); + if aux_files.n_deltas == MAX_AUX_FILE_DELTAS { self.put( AUX_FILES_KEY, Value::Image(Bytes::from( AuxFilesDirectory::ser(&dir).context("serialize")?, )), ); - n_files = 1; - aux_files.dir = Some(dir); + aux_files.n_deltas = 0; + } else { + self.put( + AUX_FILES_KEY, + Value::WalRecord(NeonWalRecord::AuxFile { file_path, content }), + ); + aux_files.n_deltas += 1; + } + aux_files.dir = Some(dir); + } else { + // Check if the AUX_FILES_KEY is initialized + match self.get(AUX_FILES_KEY, ctx).await { + Ok(dir_bytes) => { + let mut dir = AuxFilesDirectory::des(&dir_bytes)?; + // Key is already set, we may append a delta + self.put( + AUX_FILES_KEY, + Value::WalRecord(NeonWalRecord::AuxFile { + file_path: file_path.clone(), + content: content.clone(), + }), + ); + dir.upsert(file_path, content); + n_files = dir.files.len(); + aux_files.dir = Some(dir); + } + Err( + e @ (PageReconstructError::AncestorStopping(_) + | PageReconstructError::Cancelled + | PageReconstructError::AncestorLsnTimeout(_)), + ) => { + // Important that we do not interpret a shutdown error as "not found" and thereby + // reset the map. + return Err(e.into()); + } + // Note: we added missing key error variant in https://github.com/neondatabase/neon/pull/7393 but + // the original code assumes all other errors are missing keys. Therefore, we keep the code path + // the same for now, though in theory, we should only match the `MissingKey` variant. + Err( + PageReconstructError::Other(_) + | PageReconstructError::WalRedo(_) + | PageReconstructError::MissingKey { .. }, + ) => { + // Key is missing, we must insert an image as the basis for subsequent deltas. + + let mut dir = AuxFilesDirectory { + files: HashMap::new(), + }; + dir.upsert(file_path, content); + self.put( + AUX_FILES_KEY, + Value::Image(Bytes::from( + AuxFilesDirectory::ser(&dir).context("serialize")?, + )), + ); + n_files = 1; + aux_files.dir = Some(dir); + } } } - } - self.pending_directory_entries - .push((DirectoryKind::AuxFiles, n_files)); + self.pending_directory_entries + .push((DirectoryKind::AuxFiles, n_files)); + } Ok(()) } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 1d483af278..010e56a899 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3758,7 +3758,7 @@ pub(crate) mod harness { image_layer_creation_check_threshold: Some( tenant_conf.image_layer_creation_check_threshold, ), - switch_to_aux_file_v2: Some(tenant_conf.switch_to_aux_file_v2), + switch_aux_file_policy: Some(tenant_conf.switch_aux_file_policy), } } } diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index 9975c9edbc..a743ce3c16 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -9,6 +9,7 @@ //! may lead to a data loss. //! use anyhow::bail; +use pageserver_api::models::AuxFilePolicy; use pageserver_api::models::CompactionAlgorithm; use pageserver_api::models::EvictionPolicy; use pageserver_api::models::{self, ThrottleConfig}; @@ -370,9 +371,9 @@ pub struct TenantConf { // Expresed in multiples of checkpoint distance. pub image_layer_creation_check_threshold: u8, - /// Switch to aux file v2. Switching this flag requires the user has not written any aux file into + /// Switch to a new aux file policy. Switching this flag requires the user has not written any aux file into /// the storage before, and this flag cannot be switched back. Otherwise there will be data corruptions. - pub switch_to_aux_file_v2: bool, + pub switch_aux_file_policy: AuxFilePolicy, } /// Same as TenantConf, but this struct preserves the information about @@ -471,7 +472,7 @@ pub struct TenantConfOpt { #[serde(skip_serializing_if = "Option::is_none")] #[serde(default)] - pub switch_to_aux_file_v2: Option, + pub switch_aux_file_policy: Option, } impl TenantConfOpt { @@ -529,9 +530,9 @@ impl TenantConfOpt { image_layer_creation_check_threshold: self .image_layer_creation_check_threshold .unwrap_or(global_conf.image_layer_creation_check_threshold), - switch_to_aux_file_v2: self - .switch_to_aux_file_v2 - .unwrap_or(global_conf.switch_to_aux_file_v2), + switch_aux_file_policy: self + .switch_aux_file_policy + .unwrap_or(global_conf.switch_aux_file_policy), } } } @@ -573,7 +574,7 @@ impl Default for TenantConf { lazy_slru_download: false, timeline_get_throttle: crate::tenant::throttle::Config::disabled(), image_layer_creation_check_threshold: DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD, - switch_to_aux_file_v2: false, + switch_aux_file_policy: AuxFilePolicy::V1, } } } @@ -648,7 +649,7 @@ impl From for models::TenantConfig { lazy_slru_download: value.lazy_slru_download, timeline_get_throttle: value.timeline_get_throttle.map(ThrottleConfig::from), image_layer_creation_check_threshold: value.image_layer_creation_check_threshold, - switch_to_aux_file_v2: value.switch_to_aux_file_v2, + switch_aux_file_policy: value.switch_aux_file_policy, } } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 1c417262b0..7213ff8f75 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -23,8 +23,9 @@ use pageserver_api::{ }, keyspace::{KeySpaceAccum, SparseKeyPartitioning}, models::{ - CompactionAlgorithm, DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, - EvictionPolicy, InMemoryLayerInfo, LayerMapInfo, TimelineState, + AuxFilePolicy, CompactionAlgorithm, DownloadRemoteLayersTaskInfo, + DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, InMemoryLayerInfo, LayerMapInfo, + TimelineState, }, reltag::BlockNumber, shard::{ShardIdentity, ShardNumber, TenantShardId}, @@ -863,9 +864,13 @@ impl Timeline { // Initialise the reconstruct state for the key with the cache // entry returned above. let mut reconstruct_state = ValuesReconstructState::new(); - let mut key_state = VectoredValueReconstructState::default(); - key_state.img = cached_page_img; - reconstruct_state.keys.insert(key, Ok(key_state)); + + // Only add the cached image to the reconstruct state when it exists. + if cached_page_img.is_some() { + let mut key_state = VectoredValueReconstructState::default(); + key_state.img = cached_page_img; + reconstruct_state.keys.insert(key, Ok(key_state)); + } let vectored_res = self .get_vectored_impl(keyspace.clone(), lsn, reconstruct_state, ctx) @@ -1077,7 +1082,7 @@ impl Timeline { // We should generalize this into Keyspace::contains in the future. for range in &keyspace.ranges { if range.start.field1 < METADATA_KEY_BEGIN_PREFIX - || range.end.field1 >= METADATA_KEY_END_PREFIX + || range.end.field1 > METADATA_KEY_END_PREFIX { return Err(GetVectoredError::Other(anyhow::anyhow!( "only metadata keyspace can be scanned" @@ -1991,13 +1996,12 @@ const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10; // Private functions impl Timeline { - #[allow(dead_code)] - pub(crate) fn get_switch_to_aux_file_v2(&self) -> bool { + pub(crate) fn get_switch_aux_file_policy(&self) -> AuxFilePolicy { let tenant_conf = self.tenant_conf.load(); tenant_conf .tenant_conf - .switch_to_aux_file_v2 - .unwrap_or(self.conf.default_tenant_conf.switch_to_aux_file_v2) + .switch_aux_file_policy + .unwrap_or(self.conf.default_tenant_conf.switch_aux_file_policy) } pub(crate) fn get_lazy_slru_download(&self) -> bool { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 82f17fe20d..fc66822eb9 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -82,6 +82,7 @@ from fixtures.utils import ( subprocess_capture, wait_until, ) +from fixtures.utils import AuxFileStore as AuxFileStore # reexport """ This file contains pytest fixtures. A fixture is a test resource that can be @@ -465,6 +466,7 @@ class NeonEnvBuilder: initial_tenant: Optional[TenantId] = None, initial_timeline: Optional[TimelineId] = None, pageserver_virtual_file_io_engine: Optional[str] = None, + pageserver_aux_file_policy: Optional[AuxFileStore] = None, ): self.repo_dir = repo_dir self.rust_log_override = rust_log_override @@ -520,6 +522,8 @@ class NeonEnvBuilder: self.pageserver_validate_vectored_get = bool(validate) log.debug(f'Overriding pageserver validate_vectored_get config to "{validate}"') + self.pageserver_aux_file_policy = pageserver_aux_file_policy + assert test_name.startswith( "test_" ), "Unexpectedly instantiated from outside a test function" @@ -565,6 +569,7 @@ class NeonEnvBuilder: timeline_id=env.initial_timeline, shard_count=initial_tenant_shard_count, shard_stripe_size=initial_tenant_shard_stripe_size, + aux_file_v2=self.pageserver_aux_file_policy, ) assert env.initial_tenant == initial_tenant assert env.initial_timeline == initial_timeline @@ -1047,6 +1052,7 @@ class NeonEnv: ) self.pageserver_virtual_file_io_engine = config.pageserver_virtual_file_io_engine + self.pageserver_aux_file_policy = config.pageserver_aux_file_policy # Create a config file corresponding to the options cfg: Dict[str, Any] = { @@ -1283,6 +1289,7 @@ def _shared_simple_env( pg_distrib_dir: Path, pg_version: PgVersion, pageserver_virtual_file_io_engine: str, + pageserver_aux_file_policy: Optional[AuxFileStore], ) -> Iterator[NeonEnv]: """ # Internal fixture backing the `neon_simple_env` fixture. If TEST_SHARED_FIXTURES @@ -1313,6 +1320,7 @@ def _shared_simple_env( test_name=request.node.name, test_output_dir=test_output_dir, pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine, + pageserver_aux_file_policy=pageserver_aux_file_policy, ) as builder: env = builder.init_start() @@ -1352,6 +1360,7 @@ def neon_env_builder( test_overlay_dir: Path, top_output_dir: Path, pageserver_virtual_file_io_engine: str, + pageserver_aux_file_policy: Optional[AuxFileStore] = None, ) -> Iterator[NeonEnvBuilder]: """ Fixture to create a Neon environment for test. @@ -1385,6 +1394,7 @@ def neon_env_builder( test_name=request.node.name, test_output_dir=test_output_dir, test_overlay_dir=test_overlay_dir, + pageserver_aux_file_policy=pageserver_aux_file_policy, ) as builder: yield builder @@ -1544,6 +1554,7 @@ class NeonCli(AbstractNeonCli): shard_stripe_size: Optional[int] = None, placement_policy: Optional[str] = None, set_default: bool = False, + aux_file_v2: Optional[AuxFileStore] = None, ) -> Tuple[TenantId, TimelineId]: """ Creates a new tenant, returns its id and its initial timeline's id. @@ -1567,6 +1578,16 @@ class NeonCli(AbstractNeonCli): product(["-c"], (f"{key}:{value}" for key, value in conf.items())) ) ) + + if aux_file_v2 is AuxFileStore.V2: + args.extend(["-c", "switch_aux_file_policy:v2"]) + + if aux_file_v2 is AuxFileStore.V1: + args.extend(["-c", "switch_aux_file_policy:v1"]) + + if aux_file_v2 is AuxFileStore.CrossValidation: + args.extend(["-c", "switch_aux_file_policy:cross_validation"]) + if set_default: args.append("--set-default") diff --git a/test_runner/fixtures/parametrize.py b/test_runner/fixtures/parametrize.py index c8ab550ad7..77523a542b 100644 --- a/test_runner/fixtures/parametrize.py +++ b/test_runner/fixtures/parametrize.py @@ -5,6 +5,7 @@ import pytest from _pytest.python import Metafunc from fixtures.pg_version import PgVersion +from fixtures.utils import AuxFileStore """ Dynamically parametrize tests by different parameters @@ -31,6 +32,11 @@ def pageserver_virtual_file_io_engine() -> Optional[str]: return os.getenv("PAGESERVER_VIRTUAL_FILE_IO_ENGINE") +@pytest.fixture(scope="function", autouse=True) +def pageserver_aux_file_policy() -> Optional[AuxFileStore]: + return None + + def pytest_generate_tests(metafunc: Metafunc): if (bt := os.getenv("BUILD_TYPE")) is None: build_types = ["debug", "release"] diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index 9365d65fc9..6470621900 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -1,4 +1,5 @@ import contextlib +import enum import json import os import re @@ -484,3 +485,16 @@ def assert_no_errors(log_file, service, allowed_errors): log.info(f"not allowed {service} error: {error.strip()}") assert not errors, f"Log errors on {service}: {errors[0]}" + + +@enum.unique +class AuxFileStore(str, enum.Enum): + V1 = "V1" + V2 = "V2" + CrossValidation = "CrossValidation" + + def __repr__(self) -> str: + return f"'aux-{self.value}'" + + def __str__(self) -> str: + return f"'aux-{self.value}'" diff --git a/test_runner/regress/test_attach_tenant_config.py b/test_runner/regress/test_attach_tenant_config.py index 59461cc095..693add422f 100644 --- a/test_runner/regress/test_attach_tenant_config.py +++ b/test_runner/regress/test_attach_tenant_config.py @@ -190,7 +190,7 @@ def test_fully_custom_config(positive_env: NeonEnv): "trace_read_requests": True, "walreceiver_connect_timeout": "13m", "image_layer_creation_check_threshold": 1, - "switch_to_aux_file_v2": True, + "switch_aux_file_policy": "CrossValidation", } ps_http = env.pageserver.http_client() diff --git a/test_runner/regress/test_logical_replication.py b/test_runner/regress/test_logical_replication.py index 9b2abe608c..57d3447cae 100644 --- a/test_runner/regress/test_logical_replication.py +++ b/test_runner/regress/test_logical_replication.py @@ -6,6 +6,7 @@ from string import ascii_lowercase import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import ( + AuxFileStore, NeonEnv, NeonEnvBuilder, logical_replication_sync, @@ -19,6 +20,19 @@ def random_string(n: int): return "".join([choice(ascii_lowercase) for _ in range(n)]) +@pytest.mark.parametrize( + "pageserver_aux_file_policy", [AuxFileStore.V1, AuxFileStore.V2, AuxFileStore.CrossValidation] +) +def test_aux_file_v2_flag(neon_simple_env: NeonEnv, pageserver_aux_file_policy: AuxFileStore): + env = neon_simple_env + with env.pageserver.http_client() as client: + tenant_config = client.tenant_config(env.initial_tenant).effective_config + assert pageserver_aux_file_policy == tenant_config["switch_aux_file_policy"] + + +@pytest.mark.parametrize( + "pageserver_aux_file_policy", [AuxFileStore.V1, AuxFileStore.CrossValidation] +) def test_logical_replication(neon_simple_env: NeonEnv, vanilla_pg): env = neon_simple_env @@ -160,6 +174,9 @@ COMMIT; # Test that neon.logical_replication_max_snap_files works +@pytest.mark.parametrize( + "pageserver_aux_file_policy", [AuxFileStore.V1, AuxFileStore.CrossValidation] +) def test_obsolete_slot_drop(neon_simple_env: NeonEnv, vanilla_pg): def slot_removed(ep): assert ( @@ -281,6 +298,9 @@ FROM generate_series(1, 16384) AS seq; -- Inserts enough rows to exceed 16MB of # Test compute start at LSN page of which starts with contrecord # https://github.com/neondatabase/neon/issues/5749 +@pytest.mark.parametrize( + "pageserver_aux_file_policy", [AuxFileStore.V1, AuxFileStore.CrossValidation] +) def test_wal_page_boundary_start(neon_simple_env: NeonEnv, vanilla_pg): env = neon_simple_env @@ -371,6 +391,9 @@ def test_wal_page_boundary_start(neon_simple_env: NeonEnv, vanilla_pg): # logical replication bug as such, but without logical replication, # records passed ot the WAL redo process are never large enough to hit # the bug. +@pytest.mark.parametrize( + "pageserver_aux_file_policy", [AuxFileStore.V1, AuxFileStore.CrossValidation] +) def test_large_records(neon_simple_env: NeonEnv, vanilla_pg): env = neon_simple_env @@ -442,6 +465,9 @@ def test_slots_and_branching(neon_simple_env: NeonEnv): ws_cur.execute("select pg_create_logical_replication_slot('my_slot', 'pgoutput')") +@pytest.mark.parametrize( + "pageserver_aux_file_policy", [AuxFileStore.V1, AuxFileStore.CrossValidation] +) def test_replication_shutdown(neon_simple_env: NeonEnv): # Ensure Postgres can exit without stuck when a replication job is active + neon extension installed env = neon_simple_env