diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 3ec9cac2c3..5b0b6bebe3 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -743,8 +743,6 @@ pub struct TimelineInfo { // Forward compatibility: a previous version of the pageserver will receive a JSON. serde::Deserialize does // not deny unknown fields by default so it's safe to set the field to some value, though it won't be // read. - /// The last aux file policy being used on this timeline - pub last_aux_file_policy: Option, pub is_archived: Option, } diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 36a6ed427b..e6663ef56f 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -18,7 +18,6 @@ use hyper::StatusCode; use hyper::{Body, Request, Response, Uri}; use metrics::launch_timestamp::LaunchTimestamp; use pageserver_api::models::virtual_file::IoMode; -use pageserver_api::models::AuxFilePolicy; use pageserver_api::models::DownloadRemoteLayersTaskSpawnRequest; use pageserver_api::models::IngestAuxFilesRequest; use pageserver_api::models::ListAuxFilesRequest; @@ -474,8 +473,6 @@ async fn build_timeline_info_common( is_archived: Some(is_archived), walreceiver_status, - - last_aux_file_policy: timeline.last_aux_file_policy.load(), }; Ok(info) } @@ -2399,31 +2396,6 @@ async fn post_tracing_event_handler( json_response(StatusCode::OK, ()) } -async fn force_aux_policy_switch_handler( - mut r: Request, - _cancel: CancellationToken, -) -> Result, ApiError> { - check_permission(&r, None)?; - let tenant_shard_id: TenantShardId = parse_request_param(&r, "tenant_shard_id")?; - let timeline_id: TimelineId = parse_request_param(&r, "timeline_id")?; - let policy: AuxFilePolicy = json_request(&mut r).await?; - - let state = get_state(&r); - - let tenant = state - .tenant_manager - .get_attached_tenant_shard(tenant_shard_id)?; - tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?; - let timeline = - active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id) - .await?; - timeline - .do_switch_aux_policy(policy) - .map_err(ApiError::InternalServerError)?; - - json_response(StatusCode::OK, ()) -} - async fn put_io_engine_handler( mut r: Request, _cancel: CancellationToken, @@ -3136,10 +3108,6 @@ pub fn make_router( ) .put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler)) .put("/v1/io_mode", |r| api_handler(r, put_io_mode_handler)) - .put( - "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/force_aux_policy_switch", - |r| api_handler(r, force_aux_policy_switch_handler), - ) .get("/v1/utilization", |r| api_handler(r, get_utilization)) .post( "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/ingest_aux_files", diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 900da5beab..f2a11e65c1 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -22,7 +22,6 @@ use pageserver_api::key::{ CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY, }; use pageserver_api::keyspace::SparseKeySpace; -use pageserver_api::models::AuxFilePolicy; use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind}; use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; use postgres_ffi::BLCKSZ; @@ -33,7 +32,7 @@ use std::ops::ControlFlow; use std::ops::Range; use strum::IntoEnumIterator; use tokio_util::sync::CancellationToken; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, trace, warn}; use utils::bin_ser::DeserializeError; use utils::pausable_failpoint; use utils::{bin_ser::BeSer, lsn::Lsn}; @@ -677,21 +676,6 @@ impl Timeline { self.get(CHECKPOINT_KEY, lsn, ctx).await } - async fn list_aux_files_v1( - &self, - lsn: Lsn, - ctx: &RequestContext, - ) -> Result, PageReconstructError> { - match self.get(AUX_FILES_KEY, lsn, ctx).await { - Ok(buf) => Ok(AuxFilesDirectory::des(&buf)?.files), - Err(e) => { - // This is expected: historical databases do not have the key. - debug!("Failed to get info about AUX files: {}", e); - Ok(HashMap::new()) - } - } - } - async fn list_aux_files_v2( &self, lsn: Lsn, @@ -722,10 +706,7 @@ impl Timeline { lsn: Lsn, ctx: &RequestContext, ) -> Result<(), PageReconstructError> { - let current_policy = self.last_aux_file_policy.load(); - if let Some(AuxFilePolicy::V2) | Some(AuxFilePolicy::CrossValidation) = current_policy { - self.list_aux_files_v2(lsn, ctx).await?; - } + self.list_aux_files_v2(lsn, ctx).await?; Ok(()) } @@ -734,51 +715,7 @@ impl Timeline { lsn: Lsn, ctx: &RequestContext, ) -> Result, PageReconstructError> { - let current_policy = self.last_aux_file_policy.load(); - match current_policy { - Some(AuxFilePolicy::V1) => { - let res = self.list_aux_files_v1(lsn, ctx).await?; - let empty_str = if res.is_empty() { ", empty" } else { "" }; - warn!( - "this timeline is using deprecated aux file policy V1 (policy=v1{empty_str})" - ); - Ok(res) - } - None => { - let res = self.list_aux_files_v1(lsn, ctx).await?; - if !res.is_empty() { - warn!("this timeline is using deprecated aux file policy V1 (policy=None)"); - } - Ok(res) - } - Some(AuxFilePolicy::V2) => self.list_aux_files_v2(lsn, ctx).await, - Some(AuxFilePolicy::CrossValidation) => { - let v1_result = self.list_aux_files_v1(lsn, ctx).await; - let v2_result = self.list_aux_files_v2(lsn, ctx).await; - match (v1_result, v2_result) { - (Ok(v1), Ok(v2)) => { - if v1 != v2 { - tracing::error!( - "unmatched aux file v1 v2 result:\nv1 {v1:?}\nv2 {v2:?}" - ); - return Err(PageReconstructError::Other(anyhow::anyhow!( - "unmatched aux file v1 v2 result" - ))); - } - Ok(v1) - } - (Ok(_), Err(v2)) => { - tracing::error!("aux file v1 returns Ok while aux file v2 returns an err"); - Err(v2) - } - (Err(v1), Ok(_)) => { - tracing::error!("aux file v2 returns Ok while aux file v1 returns an err"); - Err(v1) - } - (Err(_), Err(v2)) => Err(v2), - } - } - } + self.list_aux_files_v2(lsn, ctx).await } pub(crate) async fn get_replorigins( @@ -954,9 +891,6 @@ impl Timeline { result.add_key(CONTROLFILE_KEY); result.add_key(CHECKPOINT_KEY); - if self.get(AUX_FILES_KEY, lsn, ctx).await.is_ok() { - result.add_key(AUX_FILES_KEY); - } // Add extra keyspaces in the test cases. Some test cases write keys into the storage without // creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace` @@ -1166,9 +1100,6 @@ impl<'a> DatadirModification<'a> { self.pending_directory_entries.push((DirectoryKind::Db, 0)); self.put(DBDIR_KEY, Value::Image(buf.into())); - // Create AuxFilesDirectory - self.init_aux_dir()?; - let buf = if self.tline.pg_version >= 17 { TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 { xids: HashSet::new(), @@ -1347,9 +1278,6 @@ impl<'a> DatadirModification<'a> { // 'true', now write the updated 'dbdirs' map back. let buf = DbDirectory::ser(&dbdir)?; self.put(DBDIR_KEY, Value::Image(buf.into())); - - // Create AuxFilesDirectory as well - self.init_aux_dir()?; } if r.is_none() { // Create RelDirectory @@ -1726,200 +1654,60 @@ impl<'a> DatadirModification<'a> { Ok(()) } - pub fn init_aux_dir(&mut self) -> anyhow::Result<()> { - if let AuxFilePolicy::V2 = self.tline.get_switch_aux_file_policy() { - return Ok(()); - } - let buf = AuxFilesDirectory::ser(&AuxFilesDirectory { - files: HashMap::new(), - })?; - self.pending_directory_entries - .push((DirectoryKind::AuxFiles, 0)); - self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf))); - Ok(()) - } - pub async fn put_file( &mut self, path: &str, content: &[u8], ctx: &RequestContext, ) -> anyhow::Result<()> { - let switch_policy = self.tline.get_switch_aux_file_policy(); - - let policy = { - let current_policy = self.tline.last_aux_file_policy.load(); - // Allowed switch path: - // * no aux files -> v1/v2/cross-validation - // * cross-validation->v2 - - let current_policy = if current_policy.is_none() { - // This path will only be hit once per tenant: we will decide the final policy in this code block. - // The next call to `put_file` will always have `last_aux_file_policy != None`. - let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn); - let aux_files_key_v1 = self.tline.list_aux_files_v1(lsn, ctx).await?; - if aux_files_key_v1.is_empty() { - None - } else { - warn!("this timeline is using deprecated aux file policy V1 (detected existing v1 files)"); - self.tline.do_switch_aux_policy(AuxFilePolicy::V1)?; - Some(AuxFilePolicy::V1) - } - } else { - current_policy - }; - - if AuxFilePolicy::is_valid_migration_path(current_policy, switch_policy) { - self.tline.do_switch_aux_policy(switch_policy)?; - info!(current=?current_policy, next=?switch_policy, "switching aux file policy"); - switch_policy - } else { - // This branch handles non-valid migration path, and the case that switch_policy == current_policy. - // And actually, because the migration path always allow unspecified -> *, this unwrap_or will never be hit. - current_policy.unwrap_or(AuxFilePolicy::default_tenant_config()) - } + let key = aux_file::encode_aux_file_key(path); + // retrieve the key from the engine + let old_val = match self.get(key, ctx).await { + Ok(val) => Some(val), + Err(PageReconstructError::MissingKey(_)) => None, + Err(e) => return Err(e.into()), }; - - if let AuxFilePolicy::V2 | AuxFilePolicy::CrossValidation = policy { - let key = aux_file::encode_aux_file_key(path); - // retrieve the key from the engine - let old_val = match self.get(key, ctx).await { - Ok(val) => Some(val), - Err(PageReconstructError::MissingKey(_)) => None, - Err(e) => return Err(e.into()), - }; - let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val { - aux_file::decode_file_value(old_val)? + let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val { + aux_file::decode_file_value(old_val)? + } else { + Vec::new() + }; + let mut other_files = Vec::with_capacity(files.len()); + let mut modifying_file = None; + for file @ (p, content) in files { + if path == p { + assert!( + modifying_file.is_none(), + "duplicated entries found for {}", + path + ); + modifying_file = Some(content); } else { - Vec::new() - }; - let mut other_files = Vec::with_capacity(files.len()); - let mut modifying_file = None; - for file @ (p, content) in files { - if path == p { - assert!( - modifying_file.is_none(), - "duplicated entries found for {}", - path - ); - modifying_file = Some(content); - } else { - other_files.push(file); - } + other_files.push(file); } - let mut new_files = other_files; - match (modifying_file, content.is_empty()) { - (Some(old_content), false) => { - self.tline - .aux_file_size_estimator - .on_update(old_content.len(), content.len()); - new_files.push((path, content)); - } - (Some(old_content), true) => { - self.tline - .aux_file_size_estimator - .on_remove(old_content.len()); - // not adding the file key to the final `new_files` vec. - } - (None, false) => { - self.tline.aux_file_size_estimator.on_add(content.len()); - new_files.push((path, content)); - } - (None, true) => warn!("removing non-existing aux file: {}", path), - } - let new_val = aux_file::encode_file_value(&new_files)?; - self.put(key, Value::Image(new_val.into())); } - - if let AuxFilePolicy::V1 | AuxFilePolicy::CrossValidation = policy { - let file_path = path.to_string(); - let content = if content.is_empty() { - None - } else { - Some(Bytes::copy_from_slice(content)) - }; - - let n_files; - let mut aux_files = self.tline.aux_files.lock().await; - if let Some(mut dir) = aux_files.dir.take() { - // We already updated aux files in `self`: emit a delta and update our latest value. - dir.upsert(file_path.clone(), content.clone()); - n_files = dir.files.len(); - if aux_files.n_deltas == MAX_AUX_FILE_DELTAS { - self.put( - AUX_FILES_KEY, - Value::Image(Bytes::from( - AuxFilesDirectory::ser(&dir).context("serialize")?, - )), - ); - aux_files.n_deltas = 0; - } else { - self.put( - AUX_FILES_KEY, - Value::WalRecord(NeonWalRecord::AuxFile { file_path, content }), - ); - aux_files.n_deltas += 1; - } - aux_files.dir = Some(dir); - } else { - // Check if the AUX_FILES_KEY is initialized - match self.get(AUX_FILES_KEY, ctx).await { - Ok(dir_bytes) => { - let mut dir = AuxFilesDirectory::des(&dir_bytes)?; - // Key is already set, we may append a delta - self.put( - AUX_FILES_KEY, - Value::WalRecord(NeonWalRecord::AuxFile { - file_path: file_path.clone(), - content: content.clone(), - }), - ); - dir.upsert(file_path, content); - n_files = dir.files.len(); - aux_files.dir = Some(dir); - } - Err( - e @ (PageReconstructError::Cancelled - | PageReconstructError::AncestorLsnTimeout(_)), - ) => { - // Important that we do not interpret a shutdown error as "not found" and thereby - // reset the map. - return Err(e.into()); - } - // Note: we added missing key error variant in https://github.com/neondatabase/neon/pull/7393 but - // the original code assumes all other errors are missing keys. Therefore, we keep the code path - // the same for now, though in theory, we should only match the `MissingKey` variant. - Err( - e @ (PageReconstructError::Other(_) - | PageReconstructError::WalRedo(_) - | PageReconstructError::MissingKey(_)), - ) => { - // Key is missing, we must insert an image as the basis for subsequent deltas. - - if !matches!(e, PageReconstructError::MissingKey(_)) { - let e = utils::error::report_compact_sources(&e); - tracing::warn!("treating error as if it was a missing key: {}", e); - } - - let mut dir = AuxFilesDirectory { - files: HashMap::new(), - }; - dir.upsert(file_path, content); - self.put( - AUX_FILES_KEY, - Value::Image(Bytes::from( - AuxFilesDirectory::ser(&dir).context("serialize")?, - )), - ); - n_files = 1; - aux_files.dir = Some(dir); - } - } + let mut new_files = other_files; + match (modifying_file, content.is_empty()) { + (Some(old_content), false) => { + self.tline + .aux_file_size_estimator + .on_update(old_content.len(), content.len()); + new_files.push((path, content)); } - - self.pending_directory_entries - .push((DirectoryKind::AuxFiles, n_files)); + (Some(old_content), true) => { + self.tline + .aux_file_size_estimator + .on_remove(old_content.len()); + // not adding the file key to the final `new_files` vec. + } + (None, false) => { + self.tline.aux_file_size_estimator.on_add(content.len()); + new_files.push((path, content)); + } + (None, true) => warn!("removing non-existing aux file: {}", path), } + let new_val = aux_file::encode_file_value(&new_files)?; + self.put(key, Value::Image(new_val.into())); Ok(()) } @@ -2089,12 +1877,6 @@ impl<'a> DatadirModification<'a> { self.tline.get(key, lsn, ctx).await } - /// Only used during unit tests, force putting a key into the modification. - #[cfg(test)] - pub(crate) fn put_for_test(&mut self, key: Key, val: Value) { - self.put(key, val); - } - fn put(&mut self, key: Key, val: Value) { if Self::is_data_key(&key) { self.put_data(key.to_compact(), val) @@ -2212,21 +1994,6 @@ struct RelDirectory { rels: HashSet<(Oid, u8)>, } -#[derive(Debug, Serialize, Deserialize, Default, PartialEq)] -pub(crate) struct AuxFilesDirectory { - pub(crate) files: HashMap, -} - -impl AuxFilesDirectory { - pub(crate) fn upsert(&mut self, key: String, value: Option) { - if let Some(value) = value { - self.files.insert(key, value); - } else { - self.files.remove(&key); - } - } -} - #[derive(Debug, Serialize, Deserialize)] struct RelSizeEntry { nblocks: u32, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index baa2365658..1066d165cd 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -20,7 +20,6 @@ use enumset::EnumSet; use futures::stream::FuturesUnordered; use futures::StreamExt; use pageserver_api::models; -use pageserver_api::models::AuxFilePolicy; use pageserver_api::models::LsnLease; use pageserver_api::models::TimelineArchivalState; use pageserver_api::models::TimelineState; @@ -800,7 +799,6 @@ impl Tenant { index_part: Option, metadata: TimelineMetadata, ancestor: Option>, - last_aux_file_policy: Option, _ctx: &RequestContext, ) -> anyhow::Result<()> { let tenant_id = self.tenant_shard_id; @@ -811,10 +809,6 @@ impl Tenant { ancestor.clone(), resources, CreateTimelineCause::Load, - // This could be derived from ancestor branch + index part. Though the only caller of `timeline_init_and_sync` is `load_remote_timeline`, - // there will potentially be other caller of this function in the future, and we don't know whether `index_part` or `ancestor` takes precedence. - // Therefore, we pass this field explicitly for now, and remove it once we fully migrate to aux file v2. - last_aux_file_policy, )?; let disk_consistent_lsn = timeline.get_disk_consistent_lsn(); anyhow::ensure!( @@ -829,10 +823,6 @@ impl Tenant { if let Some(index_part) = index_part.as_ref() { timeline.remote_client.init_upload_queue(index_part)?; - - timeline - .last_aux_file_policy - .store(index_part.last_aux_file_policy()); } else { // No data on the remote storage, but we have local metadata file. We can end up // here with timeline_create being interrupted before finishing index part upload. @@ -1403,15 +1393,12 @@ impl Tenant { None }; - let last_aux_file_policy = index_part.last_aux_file_policy(); - self.timeline_init_and_sync( timeline_id, resources, Some(index_part), remote_metadata, ancestor, - last_aux_file_policy, ctx, ) .await @@ -1824,7 +1811,6 @@ impl Tenant { create_guard, initdb_lsn, None, - None, ) .await } @@ -3032,7 +3018,6 @@ impl Tenant { ancestor: Option>, resources: TimelineResources, cause: CreateTimelineCause, - last_aux_file_policy: Option, ) -> anyhow::Result> { let state = match cause { CreateTimelineCause::Load => { @@ -3061,7 +3046,6 @@ impl Tenant { resources, pg_version, state, - last_aux_file_policy, self.attach_wal_lag_cooldown.clone(), self.cancel.child_token(), ); @@ -3720,7 +3704,6 @@ impl Tenant { timeline_create_guard, start_lsn + 1, Some(Arc::clone(src_timeline)), - src_timeline.last_aux_file_policy.load(), ) .await?; @@ -3914,7 +3897,6 @@ impl Tenant { timeline_create_guard, pgdata_lsn, None, - None, ) .await?; @@ -3986,7 +3968,6 @@ impl Tenant { create_guard: TimelineCreateGuard<'a>, start_lsn: Lsn, ancestor: Option>, - last_aux_file_policy: Option, ) -> anyhow::Result> { let tenant_shard_id = self.tenant_shard_id; @@ -4002,7 +3983,6 @@ impl Tenant { ancestor, resources, CreateTimelineCause::Load, - last_aux_file_policy, ) .context("Failed to create timeline data structure")?; @@ -4600,7 +4580,6 @@ mod tests { use super::*; use crate::keyspace::KeySpaceAccum; - use crate::pgdatadir_mapping::AuxFilesDirectory; use crate::repository::{Key, Value}; use crate::tenant::harness::*; use crate::tenant::timeline::CompactFlags; @@ -4609,7 +4588,7 @@ mod tests { use bytes::{Bytes, BytesMut}; use hex_literal::hex; use itertools::Itertools; - use pageserver_api::key::{AUX_FILES_KEY, AUX_KEY_PREFIX, NON_INHERITED_RANGE}; + use pageserver_api::key::{AUX_KEY_PREFIX, NON_INHERITED_RANGE}; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings}; use rand::{thread_rng, Rng}; @@ -4618,7 +4597,6 @@ mod tests { use tests::timeline::{GetVectoredError, ShutdownMode}; use timeline::compaction::{KeyHistoryRetention, KeyLogAtLsn}; use timeline::{DeltaLayerTestDesc, GcInfo}; - use utils::bin_ser::BeSer; use utils::id::TenantId; static TEST_KEY: Lazy = @@ -6422,16 +6400,9 @@ mod tests { } #[tokio::test] - async fn test_branch_copies_dirty_aux_file_flag() { - let harness = TenantHarness::create("test_branch_copies_dirty_aux_file_flag") - .await - .unwrap(); + async fn test_aux_file_e2e() { + let harness = TenantHarness::create("test_aux_file_e2e").await.unwrap(); - // the default aux file policy to switch is v2 if not set by the admins - assert_eq!( - harness.tenant_conf.switch_aux_file_policy, - AuxFilePolicy::default_tenant_config() - ); let (tenant, ctx) = harness.load().await; let mut lsn = Lsn(0x08); @@ -6441,9 +6412,6 @@ mod tests { .await .unwrap(); - // no aux file is written at this point, so the persistent flag should be unset - assert_eq!(tline.last_aux_file_policy.load(), None); - { lsn += 8; let mut modification = tline.begin_modification(lsn); @@ -6454,30 +6422,6 @@ mod tests { modification.commit(&ctx).await.unwrap(); } - // there is no tenant manager to pass the configuration through, so lets mimic it - tenant.set_new_location_config( - AttachedTenantConf::try_from(LocationConf::attached_single( - TenantConfOpt { - switch_aux_file_policy: Some(AuxFilePolicy::V2), - ..Default::default() - }, - tenant.generation, - &pageserver_api::models::ShardParameters::default(), - )) - .unwrap(), - ); - - assert_eq!( - tline.get_switch_aux_file_policy(), - AuxFilePolicy::V2, - "wanted state has been updated" - ); - assert_eq!( - tline.last_aux_file_policy.load(), - Some(AuxFilePolicy::V2), - "aux file is written with switch_aux_file_policy unset (which is v2), so we should use v2 there" - ); - // we can read everything from the storage let files = tline.list_aux_files(lsn, &ctx).await.unwrap(); assert_eq!( @@ -6495,12 +6439,6 @@ mod tests { modification.commit(&ctx).await.unwrap(); } - assert_eq!( - tline.last_aux_file_policy.load(), - Some(AuxFilePolicy::V2), - "keep v2 storage format when new files are written" - ); - let files = tline.list_aux_files(lsn, &ctx).await.unwrap(); assert_eq!( files.get("pg_logical/mappings/test2"), @@ -6512,321 +6450,9 @@ mod tests { .await .unwrap(); - // child copies the last flag even if that is not on remote storage yet - assert_eq!(child.get_switch_aux_file_policy(), AuxFilePolicy::V2); - assert_eq!(child.last_aux_file_policy.load(), Some(AuxFilePolicy::V2)); - let files = child.list_aux_files(lsn, &ctx).await.unwrap(); assert_eq!(files.get("pg_logical/mappings/test1"), None); assert_eq!(files.get("pg_logical/mappings/test2"), None); - - // even if we crash here without flushing parent timeline with it's new - // last_aux_file_policy we are safe, because child was never meant to access ancestor's - // files. the ancestor can even switch back to V1 because of a migration safely. - } - - #[tokio::test] - async fn aux_file_policy_switch() { - let mut harness = TenantHarness::create("aux_file_policy_switch") - .await - .unwrap(); - harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::CrossValidation; // set to cross-validation mode - let (tenant, ctx) = harness.load().await; - - let mut lsn = Lsn(0x08); - - let tline: Arc = tenant - .create_test_timeline(TIMELINE_ID, lsn, DEFAULT_PG_VERSION, &ctx) - .await - .unwrap(); - - assert_eq!( - tline.last_aux_file_policy.load(), - None, - "no aux file is written so it should be unset" - ); - - { - lsn += 8; - let mut modification = tline.begin_modification(lsn); - modification - .put_file("pg_logical/mappings/test1", b"first", &ctx) - .await - .unwrap(); - modification.commit(&ctx).await.unwrap(); - } - - // there is no tenant manager to pass the configuration through, so lets mimic it - tenant.set_new_location_config( - AttachedTenantConf::try_from(LocationConf::attached_single( - TenantConfOpt { - switch_aux_file_policy: Some(AuxFilePolicy::V2), - ..Default::default() - }, - tenant.generation, - &pageserver_api::models::ShardParameters::default(), - )) - .unwrap(), - ); - - assert_eq!( - tline.get_switch_aux_file_policy(), - AuxFilePolicy::V2, - "wanted state has been updated" - ); - assert_eq!( - tline.last_aux_file_policy.load(), - Some(AuxFilePolicy::CrossValidation), - "dirty index_part.json reflected state is yet to be updated" - ); - - // we can still read the auxfile v1 before we ingest anything new - let files = tline.list_aux_files(lsn, &ctx).await.unwrap(); - assert_eq!( - files.get("pg_logical/mappings/test1"), - Some(&bytes::Bytes::from_static(b"first")) - ); - - { - lsn += 8; - let mut modification = tline.begin_modification(lsn); - modification - .put_file("pg_logical/mappings/test2", b"second", &ctx) - .await - .unwrap(); - modification.commit(&ctx).await.unwrap(); - } - - assert_eq!( - tline.last_aux_file_policy.load(), - Some(AuxFilePolicy::V2), - "ingesting a file should apply the wanted switch state when applicable" - ); - - let files = tline.list_aux_files(lsn, &ctx).await.unwrap(); - assert_eq!( - files.get("pg_logical/mappings/test1"), - Some(&bytes::Bytes::from_static(b"first")), - "cross validation writes to both v1 and v2 so this should be available in v2" - ); - assert_eq!( - files.get("pg_logical/mappings/test2"), - Some(&bytes::Bytes::from_static(b"second")) - ); - - // mimic again by trying to flip it from V2 to V1 (not switched to while ingesting a file) - tenant.set_new_location_config( - AttachedTenantConf::try_from(LocationConf::attached_single( - TenantConfOpt { - switch_aux_file_policy: Some(AuxFilePolicy::V1), - ..Default::default() - }, - tenant.generation, - &pageserver_api::models::ShardParameters::default(), - )) - .unwrap(), - ); - - { - lsn += 8; - let mut modification = tline.begin_modification(lsn); - modification - .put_file("pg_logical/mappings/test2", b"third", &ctx) - .await - .unwrap(); - modification.commit(&ctx).await.unwrap(); - } - - assert_eq!( - tline.get_switch_aux_file_policy(), - AuxFilePolicy::V1, - "wanted state has been updated again, even if invalid request" - ); - - assert_eq!( - tline.last_aux_file_policy.load(), - Some(AuxFilePolicy::V2), - "ingesting a file should apply the wanted switch state when applicable" - ); - - let files = tline.list_aux_files(lsn, &ctx).await.unwrap(); - assert_eq!( - files.get("pg_logical/mappings/test1"), - Some(&bytes::Bytes::from_static(b"first")) - ); - assert_eq!( - files.get("pg_logical/mappings/test2"), - Some(&bytes::Bytes::from_static(b"third")) - ); - - // mimic again by trying to flip it from from V1 to V2 (not switched to while ingesting a file) - tenant.set_new_location_config( - AttachedTenantConf::try_from(LocationConf::attached_single( - TenantConfOpt { - switch_aux_file_policy: Some(AuxFilePolicy::V2), - ..Default::default() - }, - tenant.generation, - &pageserver_api::models::ShardParameters::default(), - )) - .unwrap(), - ); - - { - lsn += 8; - let mut modification = tline.begin_modification(lsn); - modification - .put_file("pg_logical/mappings/test3", b"last", &ctx) - .await - .unwrap(); - modification.commit(&ctx).await.unwrap(); - } - - assert_eq!(tline.get_switch_aux_file_policy(), AuxFilePolicy::V2); - - assert_eq!(tline.last_aux_file_policy.load(), Some(AuxFilePolicy::V2)); - - let files = tline.list_aux_files(lsn, &ctx).await.unwrap(); - assert_eq!( - files.get("pg_logical/mappings/test1"), - Some(&bytes::Bytes::from_static(b"first")) - ); - assert_eq!( - files.get("pg_logical/mappings/test2"), - Some(&bytes::Bytes::from_static(b"third")) - ); - assert_eq!( - files.get("pg_logical/mappings/test3"), - Some(&bytes::Bytes::from_static(b"last")) - ); - } - - #[tokio::test] - async fn aux_file_policy_force_switch() { - let mut harness = TenantHarness::create("aux_file_policy_force_switch") - .await - .unwrap(); - harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::V1; - let (tenant, ctx) = harness.load().await; - - let mut lsn = Lsn(0x08); - - let tline: Arc = tenant - .create_test_timeline(TIMELINE_ID, lsn, DEFAULT_PG_VERSION, &ctx) - .await - .unwrap(); - - assert_eq!( - tline.last_aux_file_policy.load(), - None, - "no aux file is written so it should be unset" - ); - - { - lsn += 8; - let mut modification = tline.begin_modification(lsn); - modification - .put_file("pg_logical/mappings/test1", b"first", &ctx) - .await - .unwrap(); - modification.commit(&ctx).await.unwrap(); - } - - tline.do_switch_aux_policy(AuxFilePolicy::V2).unwrap(); - - assert_eq!( - tline.last_aux_file_policy.load(), - Some(AuxFilePolicy::V2), - "dirty index_part.json reflected state is yet to be updated" - ); - - // lose all data from v1 - let files = tline.list_aux_files(lsn, &ctx).await.unwrap(); - assert_eq!(files.get("pg_logical/mappings/test1"), None); - - { - lsn += 8; - let mut modification = tline.begin_modification(lsn); - modification - .put_file("pg_logical/mappings/test2", b"second", &ctx) - .await - .unwrap(); - modification.commit(&ctx).await.unwrap(); - } - - // read data ingested in v2 - let files = tline.list_aux_files(lsn, &ctx).await.unwrap(); - assert_eq!( - files.get("pg_logical/mappings/test2"), - Some(&bytes::Bytes::from_static(b"second")) - ); - // lose all data from v1 - assert_eq!(files.get("pg_logical/mappings/test1"), None); - } - - #[tokio::test] - async fn aux_file_policy_auto_detect() { - let mut harness = TenantHarness::create("aux_file_policy_auto_detect") - .await - .unwrap(); - harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::V2; // set to cross-validation mode - let (tenant, ctx) = harness.load().await; - - let mut lsn = Lsn(0x08); - - let tline: Arc = tenant - .create_test_timeline(TIMELINE_ID, lsn, DEFAULT_PG_VERSION, &ctx) - .await - .unwrap(); - - assert_eq!( - tline.last_aux_file_policy.load(), - None, - "no aux file is written so it should be unset" - ); - - { - lsn += 8; - let mut modification = tline.begin_modification(lsn); - let buf = AuxFilesDirectory::ser(&AuxFilesDirectory { - files: vec![( - "test_file".to_string(), - Bytes::copy_from_slice(b"test_file"), - )] - .into_iter() - .collect(), - }) - .unwrap(); - modification.put_for_test(AUX_FILES_KEY, Value::Image(Bytes::from(buf))); - modification.commit(&ctx).await.unwrap(); - } - - { - lsn += 8; - let mut modification = tline.begin_modification(lsn); - modification - .put_file("pg_logical/mappings/test1", b"first", &ctx) - .await - .unwrap(); - modification.commit(&ctx).await.unwrap(); - } - - assert_eq!( - tline.last_aux_file_policy.load(), - Some(AuxFilePolicy::V1), - "keep using v1 because there are aux files writting with v1" - ); - - // we can still read the auxfile v1 - let files = tline.list_aux_files(lsn, &ctx).await.unwrap(); - assert_eq!( - files.get("pg_logical/mappings/test1"), - Some(&bytes::Bytes::from_static(b"first")) - ); - assert_eq!( - files.get("test_file"), - Some(&bytes::Bytes::from_static(b"test_file")) - ); } #[tokio::test] diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 1f9ae40af5..5e9702bd3d 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -187,7 +187,7 @@ use camino::Utf8Path; use chrono::{NaiveDateTime, Utc}; pub(crate) use download::download_initdb_tar_zst; -use pageserver_api::models::{AuxFilePolicy, TimelineArchivalState}; +use pageserver_api::models::TimelineArchivalState; use pageserver_api::shard::{ShardIndex, TenantShardId}; use scopeguard::ScopeGuard; use tokio_util::sync::CancellationToken; @@ -628,18 +628,6 @@ impl RemoteTimelineClient { Ok(()) } - /// Launch an index-file upload operation in the background, with only the `aux_file_policy` flag updated. - pub(crate) fn schedule_index_upload_for_aux_file_policy_update( - self: &Arc, - last_aux_file_policy: Option, - ) -> anyhow::Result<()> { - let mut guard = self.upload_queue.lock().unwrap(); - let upload_queue = guard.initialized_mut()?; - upload_queue.dirty.last_aux_file_policy = last_aux_file_policy; - self.schedule_index_upload(upload_queue)?; - Ok(()) - } - /// Launch an index-file upload operation in the background, with only the `archived_at` field updated. /// /// Returns whether it is required to wait for the queue to be empty to ensure that the change is uploaded, diff --git a/pageserver/src/tenant/remote_timeline_client/index.rs b/pageserver/src/tenant/remote_timeline_client/index.rs index c51ff54919..3a74a4ed11 100644 --- a/pageserver/src/tenant/remote_timeline_client/index.rs +++ b/pageserver/src/tenant/remote_timeline_client/index.rs @@ -133,10 +133,6 @@ impl IndexPart { pub(crate) fn example() -> Self { Self::empty(TimelineMetadata::example()) } - - pub(crate) fn last_aux_file_policy(&self) -> Option { - self.last_aux_file_policy - } } /// Metadata gathered for each of the layer files. diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 2b4f949c76..d67a139dfa 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -28,9 +28,9 @@ use pageserver_api::{ }, keyspace::{KeySpaceAccum, KeySpaceRandomAccum, SparseKeyPartitioning}, models::{ - AtomicAuxFilePolicy, AuxFilePolicy, CompactionAlgorithm, CompactionAlgorithmSettings, - DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, - InMemoryLayerInfo, LayerMapInfo, LsnLease, TimelineState, + CompactionAlgorithm, CompactionAlgorithmSettings, DownloadRemoteLayersTaskInfo, + DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, InMemoryLayerInfo, LayerMapInfo, + LsnLease, TimelineState, }, reltag::BlockNumber, shard::{ShardIdentity, ShardNumber, TenantShardId}, @@ -98,12 +98,12 @@ use crate::{ use crate::{ metrics::ScanLatencyOngoingRecording, tenant::timeline::logical_size::CurrentLogicalSize, }; -use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind}; -use crate::{pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS, tenant::storage_layer::PersistentLayerKey}; use crate::{ - pgdatadir_mapping::{AuxFilesDirectory, DirectoryKind}, + pgdatadir_mapping::DirectoryKind, virtual_file::{MaybeFatalIo, VirtualFile}, }; +use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind}; +use crate::{pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS, tenant::storage_layer::PersistentLayerKey}; use pageserver_api::config::tenant_conf_defaults::DEFAULT_PITR_INTERVAL; use crate::config::PageServerConf; @@ -206,11 +206,6 @@ pub struct TimelineResources { pub l0_flush_global_state: l0_flush::L0FlushGlobalState, } -pub(crate) struct AuxFilesState { - pub(crate) dir: Option, - pub(crate) n_deltas: usize, -} - /// The relation size cache caches relation sizes at the end of the timeline. It speeds up WAL /// ingestion considerably, because WAL ingestion needs to check on most records if the record /// implicitly extends the relation. At startup, `complete_as_of` is initialized to the current end @@ -413,15 +408,9 @@ pub struct Timeline { timeline_get_throttle: Arc>, - /// Keep aux directory cache to avoid it's reconstruction on each update - pub(crate) aux_files: tokio::sync::Mutex, - /// Size estimator for aux file v2 pub(crate) aux_file_size_estimator: AuxFileSizeEstimator, - /// Indicate whether aux file v2 storage is enabled. - pub(crate) last_aux_file_policy: AtomicAuxFilePolicy, - /// Some test cases directly place keys into the timeline without actually modifying the directory /// keys (i.e., DB_DIR). The test cases creating such keys will put the keyspaces here, so that /// these keys won't get garbage-collected during compaction/GC. This field only modifies the dense @@ -2012,14 +2001,6 @@ impl Timeline { .unwrap_or(self.conf.default_tenant_conf.lsn_lease_length_for_ts) } - pub(crate) fn get_switch_aux_file_policy(&self) -> AuxFilePolicy { - let tenant_conf = self.tenant_conf.load(); - tenant_conf - .tenant_conf - .switch_aux_file_policy - .unwrap_or(self.conf.default_tenant_conf.switch_aux_file_policy) - } - pub(crate) fn get_lazy_slru_download(&self) -> bool { let tenant_conf = self.tenant_conf.load(); tenant_conf @@ -2152,7 +2133,6 @@ impl Timeline { resources: TimelineResources, pg_version: u32, state: TimelineState, - aux_file_policy: Option, attach_wal_lag_cooldown: Arc>, cancel: CancellationToken, ) -> Arc { @@ -2282,15 +2262,8 @@ impl Timeline { timeline_get_throttle: resources.timeline_get_throttle, - aux_files: tokio::sync::Mutex::new(AuxFilesState { - dir: None, - n_deltas: 0, - }), - aux_file_size_estimator: AuxFileSizeEstimator::new(aux_file_metrics), - last_aux_file_policy: AtomicAuxFilePolicy::new(aux_file_policy), - #[cfg(test)] extra_test_dense_keyspace: ArcSwap::new(Arc::new(KeySpace::default())), @@ -2301,10 +2274,6 @@ impl Timeline { attach_wal_lag_cooldown, }; - if aux_file_policy == Some(AuxFilePolicy::V1) { - warn!("this timeline is using deprecated aux file policy V1 (when loading the timeline)"); - } - result.repartition_threshold = result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE; @@ -4479,14 +4448,6 @@ impl Timeline { ) -> Result<(), detach_ancestor::Error> { detach_ancestor::complete(self, tenant, attempt, ctx).await } - - /// Switch aux file policy and schedule upload to the index part. - pub(crate) fn do_switch_aux_policy(&self, policy: AuxFilePolicy) -> anyhow::Result<()> { - self.last_aux_file_policy.store(Some(policy)); - self.remote_client - .schedule_index_upload_for_aux_file_policy_update(Some(policy))?; - Ok(()) - } } impl Drop for Timeline { diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index 305c5758cc..71b9e4e288 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -283,8 +283,6 @@ impl DeleteTimelineFlow { // Important. We dont pass ancestor above because it can be missing. // Thus we need to skip the validation here. CreateTimelineCause::Delete, - // Aux file policy is not needed for deletion, assuming deletion does not read aux keyspace - None, ) .context("create_timeline_struct")?; diff --git a/pageserver/src/walredo/apply_neon.rs b/pageserver/src/walredo/apply_neon.rs index facf01004c..c067787f97 100644 --- a/pageserver/src/walredo/apply_neon.rs +++ b/pageserver/src/walredo/apply_neon.rs @@ -1,8 +1,7 @@ -use crate::pgdatadir_mapping::AuxFilesDirectory; use crate::walrecord::NeonWalRecord; use anyhow::Context; use byteorder::{ByteOrder, LittleEndian}; -use bytes::{BufMut, BytesMut}; +use bytes::BytesMut; use pageserver_api::key::Key; use pageserver_api::reltag::SlruKind; use postgres_ffi::pg_constants; @@ -13,7 +12,6 @@ use postgres_ffi::v14::nonrelfile_utils::{ }; use postgres_ffi::BLCKSZ; use tracing::*; -use utils::bin_ser::BeSer; use utils::lsn::Lsn; /// Can this request be served by neon redo functions @@ -236,13 +234,9 @@ pub(crate) fn apply_in_neon( LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid); } } - NeonWalRecord::AuxFile { file_path, content } => { - let mut dir = AuxFilesDirectory::des(page)?; - dir.upsert(file_path.clone(), content.clone()); - - page.clear(); - let mut writer = page.writer(); - dir.ser_into(&mut writer)?; + NeonWalRecord::AuxFile { .. } => { + // No-op: this record will never be created in aux v2. + warn!("AuxFile record should not be created in aux v2"); } #[cfg(test)] NeonWalRecord::Test { @@ -250,6 +244,7 @@ pub(crate) fn apply_in_neon( clear, will_init, } => { + use bytes::BufMut; if *will_init { assert!(*clear, "init record must be clear to ensure correctness"); } @@ -261,59 +256,3 @@ pub(crate) fn apply_in_neon( } Ok(()) } - -#[cfg(test)] -mod test { - use bytes::Bytes; - use pageserver_api::key::AUX_FILES_KEY; - - use super::*; - use std::collections::HashMap; - - /// Test [`apply_in_neon`]'s handling of NeonWalRecord::AuxFile - #[test] - fn apply_aux_file_deltas() -> anyhow::Result<()> { - let base_dir = AuxFilesDirectory { - files: HashMap::from([ - ("two".to_string(), Bytes::from_static(b"content0")), - ("three".to_string(), Bytes::from_static(b"contentX")), - ]), - }; - let base_image = AuxFilesDirectory::ser(&base_dir)?; - - let deltas = vec![ - // Insert - NeonWalRecord::AuxFile { - file_path: "one".to_string(), - content: Some(Bytes::from_static(b"content1")), - }, - // Update - NeonWalRecord::AuxFile { - file_path: "two".to_string(), - content: Some(Bytes::from_static(b"content99")), - }, - // Delete - NeonWalRecord::AuxFile { - file_path: "three".to_string(), - content: None, - }, - ]; - - let file_path = AUX_FILES_KEY; - let mut page = BytesMut::from_iter(base_image); - - for record in deltas { - apply_in_neon(&record, Lsn(8), file_path, &mut page)?; - } - - let reconstructed = AuxFilesDirectory::des(&page)?; - let expect = HashMap::from([ - ("one".to_string(), Bytes::from_static(b"content1")), - ("two".to_string(), Bytes::from_static(b"content99")), - ]); - - assert_eq!(reconstructed.files, expect); - - Ok(()) - } -} diff --git a/test_runner/regress/test_aux_files.py b/test_runner/regress/test_aux_files.py deleted file mode 100644 index 91d674d0db..0000000000 --- a/test_runner/regress/test_aux_files.py +++ /dev/null @@ -1,78 +0,0 @@ -from __future__ import annotations - -from fixtures.log_helper import log -from fixtures.neon_fixtures import ( - AuxFileStore, - NeonEnvBuilder, - logical_replication_sync, -) - - -def test_aux_v2_config_switch(neon_env_builder: NeonEnvBuilder, vanilla_pg): - env = neon_env_builder.init_start() - endpoint = env.endpoints.create_start("main") - client = env.pageserver.http_client() - - tenant_id = env.initial_tenant - timeline_id = env.initial_timeline - - tenant_config = client.tenant_config(tenant_id).effective_config - tenant_config["switch_aux_file_policy"] = AuxFileStore.V2 - client.set_tenant_config(tenant_id, tenant_config) - # aux file v2 is enabled on the write path, so for now, it should be unset (or null) - assert ( - client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)["last_aux_file_policy"] - is None - ) - - pg_conn = endpoint.connect() - cur = pg_conn.cursor() - - cur.execute("create table t(pk integer primary key, payload integer)") - cur.execute( - "CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120));" - ) - cur.execute("create publication pub1 for table t, replication_example") - - # now start subscriber, aux files will be created at this point. TODO: find better ways of testing aux files (i.e., neon_test_utils) - # instead of going through the full logical replication process. - vanilla_pg.start() - vanilla_pg.safe_psql("create table t(pk integer primary key, payload integer)") - vanilla_pg.safe_psql( - "CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120), testcolumn1 int, testcolumn2 int, testcolumn3 int);" - ) - connstr = endpoint.connstr().replace("'", "''") - log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}") - vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1") - - # Wait logical replication channel to be established - logical_replication_sync(vanilla_pg, endpoint) - vanilla_pg.stop() - endpoint.stop() - - with env.pageserver.http_client() as client: - # aux file v2 flag should be enabled at this point - assert ( - client.timeline_detail(tenant_id, timeline_id)["last_aux_file_policy"] - == AuxFileStore.V2 - ) - with env.pageserver.http_client() as client: - tenant_config = client.tenant_config(tenant_id).effective_config - tenant_config["switch_aux_file_policy"] = "V1" - client.set_tenant_config(tenant_id, tenant_config) - # the flag should still be enabled - assert ( - client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)[ - "last_aux_file_policy" - ] - == AuxFileStore.V2 - ) - env.pageserver.restart() - with env.pageserver.http_client() as client: - # aux file v2 flag should be persisted - assert ( - client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)[ - "last_aux_file_policy" - ] - == AuxFileStore.V2 - )