From 22a787ec9c07625907ff8583160bd56fa2102f64 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Thu, 22 Feb 2024 14:39:18 +0300 Subject: [PATCH] Zero aux files on any aux file ingest. --- pageserver/src/basebackup.rs | 44 ++++---- pageserver/src/pgdatadir_mapping.rs | 154 +++++++++++++++------------- pageserver/src/walingest.rs | 9 +- 3 files changed, 107 insertions(+), 100 deletions(-) diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 0b04ba406d..08bd3f6774 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -13,9 +13,9 @@ use anyhow::{anyhow, bail, ensure, Context}; use bytes::{BufMut, Bytes, BytesMut}; use fail::fail_point; -use hex::FromHex; + use pageserver_api::key::{key_to_slru_block, Key}; -use postgres_ffi::pg_constants; + use std::fmt::Write as FmtWrite; use std::time::SystemTime; use tokio::io; @@ -274,7 +274,7 @@ where slru_builder.finish().await?; } - let mut min_restart_lsn: Lsn = Lsn::MAX; + let min_restart_lsn: Lsn = Lsn::MAX; // Create tablespace directories for ((spcnode, dbnode), has_relmap_file) in self.timeline.list_dbdirs(self.lsn, self.ctx).await? @@ -309,26 +309,24 @@ where } } - // one-off hack: disable listing aux files for this tenant - if self.timeline.tenant_shard_id.tenant_id - != utils::id::TenantId::from_hex("94dfb564f25964764b19763f337730ce").unwrap() - { - for (path, content) in self.timeline.list_aux_files(self.lsn, self.ctx).await? { - if path.starts_with("pg_replslot") { - let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN; - let restart_lsn = Lsn(u64::from_le_bytes( - content[offs..offs + 8].try_into().unwrap(), - )); - info!("Replication slot {} restart LSN={}", path, restart_lsn); - min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn); - } - let header = new_tar_header(&path, content.len() as u64)?; - self.ar - .append(&header, &*content) - .await - .context("could not add aux file to basebackup tarball")?; - } - } + // one-off hack: disable listing aux files + // { + // for (path, content) in self.timeline.list_aux_files(self.lsn, self.ctx).await? { + // if path.starts_with("pg_replslot") { + // let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN; + // let restart_lsn = Lsn(u64::from_le_bytes( + // content[offs..offs + 8].try_into().unwrap(), + // )); + // info!("Replication slot {} restart LSN={}", path, restart_lsn); + // min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn); + // } + // let header = new_tar_header(&path, content.len() as u64)?; + // self.ar + // .append(&header, &*content) + // .await + // .context("could not add aux file to basebackup tarball")?; + // } + // } } if min_restart_lsn != Lsn::MAX { info!( diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 0ff03303d4..ecdb271f07 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -671,6 +671,7 @@ impl Timeline { self.get(CHECKPOINT_KEY, lsn, ctx).await } + #[allow(dead_code)] pub(crate) async fn list_aux_files( &self, lsn: Lsn, @@ -1389,81 +1390,96 @@ impl<'a> DatadirModification<'a> { pub async fn put_file( &mut self, - path: &str, - content: &[u8], - ctx: &RequestContext, + _path: &str, + _content: &[u8], + _ctx: &RequestContext, ) -> anyhow::Result<()> { - let file_path = path.to_string(); - let content = if content.is_empty() { - None - } else { - Some(Bytes::copy_from_slice(content)) + // one-off hack: zero aux files on any ingest + let dir = AuxFilesDirectory { + files: HashMap::new(), }; - - let dir = if let Some(mut dir) = self.pending_aux_files.take() { - // We already updated aux files in `self`: emit a delta and update our latest value - - self.put( - AUX_FILES_KEY, - Value::WalRecord(NeonWalRecord::AuxFile { - file_path: file_path.clone(), - content: content.clone(), - }), - ); - - dir.upsert(file_path, content); - dir - } else { - // Check if the AUX_FILES_KEY is initialized - match self.get(AUX_FILES_KEY, ctx).await { - Ok(dir_bytes) => { - let mut dir = AuxFilesDirectory::des(&dir_bytes)?; - // Key is already set, we may append a delta - self.put( - AUX_FILES_KEY, - Value::WalRecord(NeonWalRecord::AuxFile { - file_path: file_path.clone(), - content: content.clone(), - }), - ); - dir.upsert(file_path, content); - dir - } - Err( - e @ (PageReconstructError::AncestorStopping(_) - | PageReconstructError::Cancelled - | PageReconstructError::AncestorLsnTimeout(_)), - ) => { - // Important that we do not interpret a shutdown error as "not found" and thereby - // reset the map. - return Err(e.into()); - } - // FIXME: PageReconstructError doesn't have an explicit variant for key-not-found, so - // we are assuming that all _other_ possible errors represents a missing key. If some - // other error occurs, we may incorrectly reset the map of aux files. - Err(PageReconstructError::Other(_) | PageReconstructError::WalRedo(_)) => { - // Key is missing, we must insert an image as the basis for subsequent deltas. - - let mut dir = AuxFilesDirectory { - files: HashMap::new(), - }; - dir.upsert(file_path, content); - self.put( - AUX_FILES_KEY, - Value::Image(Bytes::from( - AuxFilesDirectory::ser(&dir).context("serialize")?, - )), - ); - dir - } - } - }; - + self.put( + AUX_FILES_KEY, + Value::Image(Bytes::from( + AuxFilesDirectory::ser(&dir).context("serialize")?, + )), + ); self.pending_directory_entries .push((DirectoryKind::AuxFiles, dir.files.len())); self.pending_aux_files = Some(dir); - Ok(()) + + // let file_path = path.to_string(); + // let content = if content.is_empty() { + // None + // } else { + // Some(Bytes::copy_from_slice(content)) + // }; + + // let dir = if let Some(mut dir) = self.pending_aux_files.take() { + // // We already updated aux files in `self`: emit a delta and update our latest value + + // self.put( + // AUX_FILES_KEY, + // Value::WalRecord(NeonWalRecord::AuxFile { + // file_path: file_path.clone(), + // content: content.clone(), + // }), + // ); + + // dir.upsert(file_path, content); + // dir + // } else { + // // Check if the AUX_FILES_KEY is initialized + // match self.get(AUX_FILES_KEY, ctx).await { + // Ok(dir_bytes) => { + // let mut dir = AuxFilesDirectory::des(&dir_bytes)?; + // // Key is already set, we may append a delta + // self.put( + // AUX_FILES_KEY, + // Value::WalRecord(NeonWalRecord::AuxFile { + // file_path: file_path.clone(), + // content: content.clone(), + // }), + // ); + // dir.upsert(file_path, content); + // dir + // } + // Err( + // e @ (PageReconstructError::AncestorStopping(_) + // | PageReconstructError::Cancelled + // | PageReconstructError::AncestorLsnTimeout(_)), + // ) => { + // // Important that we do not interpret a shutdown error as "not found" and thereby + // // reset the map. + // return Err(e.into()); + // } + // // FIXME: PageReconstructError doesn't have an explicit variant for key-not-found, so + // // we are assuming that all _other_ possible errors represents a missing key. If some + // // other error occurs, we may incorrectly reset the map of aux files. + // Err(PageReconstructError::Other(_) | PageReconstructError::WalRedo(_)) => { + // // Key is missing, we must insert an image as the basis for subsequent deltas. + + // let mut dir = AuxFilesDirectory { + // files: HashMap::new(), + // }; + // dir.upsert(file_path, content); + // self.put( + // AUX_FILES_KEY, + // Value::Image(Bytes::from( + // AuxFilesDirectory::ser(&dir).context("serialize")?, + // )), + // ); + // dir + // } + // } + // }; + + // self.pending_directory_entries + // .push((DirectoryKind::AuxFiles, dir.files.len())); + // self.pending_aux_files = Some(dir); + + // Ok(()) } /// diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 7d0092e235..8df2f1713a 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -28,7 +28,6 @@ use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn}; use anyhow::{bail, Context, Result}; use bytes::{Buf, Bytes, BytesMut}; -use hex::FromHex; use tracing::*; use utils::failpoint_support; @@ -357,13 +356,7 @@ impl WalIngest { // a particular string, for example, but this is enough for now. failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep"); } else if let Some(path) = prefix.strip_prefix("neon-file:") { - // one off hack: disable aux file ingestion for this tenant - if modification.tline.tenant_shard_id.tenant_id - != utils::id::TenantId::from_hex("94dfb564f25964764b19763f337730ce") - .unwrap() - { - modification.put_file(path, message, ctx).await?; - } + modification.put_file(path, message, ctx).await?; } } }