Compare commits

...

2 Commits

Author SHA1 Message Date
Arseny Sher
22a787ec9c Zero aux files on any aux file ingest. 2024-02-22 14:59:29 +03:00
Arseny Sher
9c662e65a2 Ignore aux files ingest and listing in basebackup for one tenant.
As one off hack to make it run right now.
2024-02-22 09:50:14 +03:00
2 changed files with 106 additions and 86 deletions

View File

@@ -13,8 +13,9 @@
use anyhow::{anyhow, bail, ensure, Context};
use bytes::{BufMut, Bytes, BytesMut};
use fail::fail_point;
use pageserver_api::key::{key_to_slru_block, Key};
use postgres_ffi::pg_constants;
use std::fmt::Write as FmtWrite;
use std::time::SystemTime;
use tokio::io;
@@ -273,7 +274,7 @@ where
slru_builder.finish().await?;
}
let mut min_restart_lsn: Lsn = Lsn::MAX;
let min_restart_lsn: Lsn = Lsn::MAX;
// Create tablespace directories
for ((spcnode, dbnode), has_relmap_file) in
self.timeline.list_dbdirs(self.lsn, self.ctx).await?
@@ -308,21 +309,24 @@ where
}
}
for (path, content) in self.timeline.list_aux_files(self.lsn, self.ctx).await? {
if path.starts_with("pg_replslot") {
let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
let restart_lsn = Lsn(u64::from_le_bytes(
content[offs..offs + 8].try_into().unwrap(),
));
info!("Replication slot {} restart LSN={}", path, restart_lsn);
min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn);
}
let header = new_tar_header(&path, content.len() as u64)?;
self.ar
.append(&header, &*content)
.await
.context("could not add aux file to basebackup tarball")?;
}
// one-off hack: disable listing aux files
// {
// for (path, content) in self.timeline.list_aux_files(self.lsn, self.ctx).await? {
// if path.starts_with("pg_replslot") {
// let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
// let restart_lsn = Lsn(u64::from_le_bytes(
// content[offs..offs + 8].try_into().unwrap(),
// ));
// info!("Replication slot {} restart LSN={}", path, restart_lsn);
// min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn);
// }
// let header = new_tar_header(&path, content.len() as u64)?;
// self.ar
// .append(&header, &*content)
// .await
// .context("could not add aux file to basebackup tarball")?;
// }
// }
}
if min_restart_lsn != Lsn::MAX {
info!(

View File

@@ -671,6 +671,7 @@ impl Timeline {
self.get(CHECKPOINT_KEY, lsn, ctx).await
}
#[allow(dead_code)]
pub(crate) async fn list_aux_files(
&self,
lsn: Lsn,
@@ -1389,81 +1390,96 @@ impl<'a> DatadirModification<'a> {
pub async fn put_file(
&mut self,
path: &str,
content: &[u8],
ctx: &RequestContext,
_path: &str,
_content: &[u8],
_ctx: &RequestContext,
) -> anyhow::Result<()> {
let file_path = path.to_string();
let content = if content.is_empty() {
None
} else {
Some(Bytes::copy_from_slice(content))
// one-off hack: zero aux files on any ingest
let dir = AuxFilesDirectory {
files: HashMap::new(),
};
let dir = if let Some(mut dir) = self.pending_aux_files.take() {
// We already updated aux files in `self`: emit a delta and update our latest value
self.put(
AUX_FILES_KEY,
Value::WalRecord(NeonWalRecord::AuxFile {
file_path: file_path.clone(),
content: content.clone(),
}),
);
dir.upsert(file_path, content);
dir
} else {
// Check if the AUX_FILES_KEY is initialized
match self.get(AUX_FILES_KEY, ctx).await {
Ok(dir_bytes) => {
let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
// Key is already set, we may append a delta
self.put(
AUX_FILES_KEY,
Value::WalRecord(NeonWalRecord::AuxFile {
file_path: file_path.clone(),
content: content.clone(),
}),
);
dir.upsert(file_path, content);
dir
}
Err(
e @ (PageReconstructError::AncestorStopping(_)
| PageReconstructError::Cancelled
| PageReconstructError::AncestorLsnTimeout(_)),
) => {
// Important that we do not interpret a shutdown error as "not found" and thereby
// reset the map.
return Err(e.into());
}
// FIXME: PageReconstructError doesn't have an explicit variant for key-not-found, so
// we are assuming that all _other_ possible errors represents a missing key. If some
// other error occurs, we may incorrectly reset the map of aux files.
Err(PageReconstructError::Other(_) | PageReconstructError::WalRedo(_)) => {
// Key is missing, we must insert an image as the basis for subsequent deltas.
let mut dir = AuxFilesDirectory {
files: HashMap::new(),
};
dir.upsert(file_path, content);
self.put(
AUX_FILES_KEY,
Value::Image(Bytes::from(
AuxFilesDirectory::ser(&dir).context("serialize")?,
)),
);
dir
}
}
};
self.put(
AUX_FILES_KEY,
Value::Image(Bytes::from(
AuxFilesDirectory::ser(&dir).context("serialize")?,
)),
);
self.pending_directory_entries
.push((DirectoryKind::AuxFiles, dir.files.len()));
self.pending_aux_files = Some(dir);
Ok(())
// let file_path = path.to_string();
// let content = if content.is_empty() {
// None
// } else {
// Some(Bytes::copy_from_slice(content))
// };
// let dir = if let Some(mut dir) = self.pending_aux_files.take() {
// // We already updated aux files in `self`: emit a delta and update our latest value
// self.put(
// AUX_FILES_KEY,
// Value::WalRecord(NeonWalRecord::AuxFile {
// file_path: file_path.clone(),
// content: content.clone(),
// }),
// );
// dir.upsert(file_path, content);
// dir
// } else {
// // Check if the AUX_FILES_KEY is initialized
// match self.get(AUX_FILES_KEY, ctx).await {
// Ok(dir_bytes) => {
// let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
// // Key is already set, we may append a delta
// self.put(
// AUX_FILES_KEY,
// Value::WalRecord(NeonWalRecord::AuxFile {
// file_path: file_path.clone(),
// content: content.clone(),
// }),
// );
// dir.upsert(file_path, content);
// dir
// }
// Err(
// e @ (PageReconstructError::AncestorStopping(_)
// | PageReconstructError::Cancelled
// | PageReconstructError::AncestorLsnTimeout(_)),
// ) => {
// // Important that we do not interpret a shutdown error as "not found" and thereby
// // reset the map.
// return Err(e.into());
// }
// // FIXME: PageReconstructError doesn't have an explicit variant for key-not-found, so
// // we are assuming that all _other_ possible errors represents a missing key. If some
// // other error occurs, we may incorrectly reset the map of aux files.
// Err(PageReconstructError::Other(_) | PageReconstructError::WalRedo(_)) => {
// // Key is missing, we must insert an image as the basis for subsequent deltas.
// let mut dir = AuxFilesDirectory {
// files: HashMap::new(),
// };
// dir.upsert(file_path, content);
// self.put(
// AUX_FILES_KEY,
// Value::Image(Bytes::from(
// AuxFilesDirectory::ser(&dir).context("serialize")?,
// )),
// );
// dir
// }
// }
// };
// self.pending_directory_entries
// .push((DirectoryKind::AuxFiles, dir.files.len()));
// self.pending_aux_files = Some(dir);
// Ok(())
}
///