mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
Zero aux files on any aux file ingest.
This commit is contained in:
@@ -13,9 +13,9 @@
|
||||
use anyhow::{anyhow, bail, ensure, Context};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use fail::fail_point;
|
||||
use hex::FromHex;
|
||||
|
||||
use pageserver_api::key::{key_to_slru_block, Key};
|
||||
use postgres_ffi::pg_constants;
|
||||
|
||||
use std::fmt::Write as FmtWrite;
|
||||
use std::time::SystemTime;
|
||||
use tokio::io;
|
||||
@@ -274,7 +274,7 @@ where
|
||||
slru_builder.finish().await?;
|
||||
}
|
||||
|
||||
let mut min_restart_lsn: Lsn = Lsn::MAX;
|
||||
let min_restart_lsn: Lsn = Lsn::MAX;
|
||||
// Create tablespace directories
|
||||
for ((spcnode, dbnode), has_relmap_file) in
|
||||
self.timeline.list_dbdirs(self.lsn, self.ctx).await?
|
||||
@@ -309,26 +309,24 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// one-off hack: disable listing aux files for this tenant
|
||||
if self.timeline.tenant_shard_id.tenant_id
|
||||
!= utils::id::TenantId::from_hex("94dfb564f25964764b19763f337730ce").unwrap()
|
||||
{
|
||||
for (path, content) in self.timeline.list_aux_files(self.lsn, self.ctx).await? {
|
||||
if path.starts_with("pg_replslot") {
|
||||
let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
|
||||
let restart_lsn = Lsn(u64::from_le_bytes(
|
||||
content[offs..offs + 8].try_into().unwrap(),
|
||||
));
|
||||
info!("Replication slot {} restart LSN={}", path, restart_lsn);
|
||||
min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn);
|
||||
}
|
||||
let header = new_tar_header(&path, content.len() as u64)?;
|
||||
self.ar
|
||||
.append(&header, &*content)
|
||||
.await
|
||||
.context("could not add aux file to basebackup tarball")?;
|
||||
}
|
||||
}
|
||||
// one-off hack: disable listing aux files
|
||||
// {
|
||||
// for (path, content) in self.timeline.list_aux_files(self.lsn, self.ctx).await? {
|
||||
// if path.starts_with("pg_replslot") {
|
||||
// let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
|
||||
// let restart_lsn = Lsn(u64::from_le_bytes(
|
||||
// content[offs..offs + 8].try_into().unwrap(),
|
||||
// ));
|
||||
// info!("Replication slot {} restart LSN={}", path, restart_lsn);
|
||||
// min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn);
|
||||
// }
|
||||
// let header = new_tar_header(&path, content.len() as u64)?;
|
||||
// self.ar
|
||||
// .append(&header, &*content)
|
||||
// .await
|
||||
// .context("could not add aux file to basebackup tarball")?;
|
||||
// }
|
||||
// }
|
||||
}
|
||||
if min_restart_lsn != Lsn::MAX {
|
||||
info!(
|
||||
|
||||
@@ -671,6 +671,7 @@ impl Timeline {
|
||||
self.get(CHECKPOINT_KEY, lsn, ctx).await
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn list_aux_files(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
@@ -1389,81 +1390,96 @@ impl<'a> DatadirModification<'a> {
|
||||
|
||||
pub async fn put_file(
|
||||
&mut self,
|
||||
path: &str,
|
||||
content: &[u8],
|
||||
ctx: &RequestContext,
|
||||
_path: &str,
|
||||
_content: &[u8],
|
||||
_ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let file_path = path.to_string();
|
||||
let content = if content.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(Bytes::copy_from_slice(content))
|
||||
// one-off hack: zero aux files on any ingest
|
||||
let dir = AuxFilesDirectory {
|
||||
files: HashMap::new(),
|
||||
};
|
||||
|
||||
let dir = if let Some(mut dir) = self.pending_aux_files.take() {
|
||||
// We already updated aux files in `self`: emit a delta and update our latest value
|
||||
|
||||
self.put(
|
||||
AUX_FILES_KEY,
|
||||
Value::WalRecord(NeonWalRecord::AuxFile {
|
||||
file_path: file_path.clone(),
|
||||
content: content.clone(),
|
||||
}),
|
||||
);
|
||||
|
||||
dir.upsert(file_path, content);
|
||||
dir
|
||||
} else {
|
||||
// Check if the AUX_FILES_KEY is initialized
|
||||
match self.get(AUX_FILES_KEY, ctx).await {
|
||||
Ok(dir_bytes) => {
|
||||
let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
|
||||
// Key is already set, we may append a delta
|
||||
self.put(
|
||||
AUX_FILES_KEY,
|
||||
Value::WalRecord(NeonWalRecord::AuxFile {
|
||||
file_path: file_path.clone(),
|
||||
content: content.clone(),
|
||||
}),
|
||||
);
|
||||
dir.upsert(file_path, content);
|
||||
dir
|
||||
}
|
||||
Err(
|
||||
e @ (PageReconstructError::AncestorStopping(_)
|
||||
| PageReconstructError::Cancelled
|
||||
| PageReconstructError::AncestorLsnTimeout(_)),
|
||||
) => {
|
||||
// Important that we do not interpret a shutdown error as "not found" and thereby
|
||||
// reset the map.
|
||||
return Err(e.into());
|
||||
}
|
||||
// FIXME: PageReconstructError doesn't have an explicit variant for key-not-found, so
|
||||
// we are assuming that all _other_ possible errors represents a missing key. If some
|
||||
// other error occurs, we may incorrectly reset the map of aux files.
|
||||
Err(PageReconstructError::Other(_) | PageReconstructError::WalRedo(_)) => {
|
||||
// Key is missing, we must insert an image as the basis for subsequent deltas.
|
||||
|
||||
let mut dir = AuxFilesDirectory {
|
||||
files: HashMap::new(),
|
||||
};
|
||||
dir.upsert(file_path, content);
|
||||
self.put(
|
||||
AUX_FILES_KEY,
|
||||
Value::Image(Bytes::from(
|
||||
AuxFilesDirectory::ser(&dir).context("serialize")?,
|
||||
)),
|
||||
);
|
||||
dir
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
self.put(
|
||||
AUX_FILES_KEY,
|
||||
Value::Image(Bytes::from(
|
||||
AuxFilesDirectory::ser(&dir).context("serialize")?,
|
||||
)),
|
||||
);
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::AuxFiles, dir.files.len()));
|
||||
self.pending_aux_files = Some(dir);
|
||||
|
||||
Ok(())
|
||||
|
||||
// let file_path = path.to_string();
|
||||
// let content = if content.is_empty() {
|
||||
// None
|
||||
// } else {
|
||||
// Some(Bytes::copy_from_slice(content))
|
||||
// };
|
||||
|
||||
// let dir = if let Some(mut dir) = self.pending_aux_files.take() {
|
||||
// // We already updated aux files in `self`: emit a delta and update our latest value
|
||||
|
||||
// self.put(
|
||||
// AUX_FILES_KEY,
|
||||
// Value::WalRecord(NeonWalRecord::AuxFile {
|
||||
// file_path: file_path.clone(),
|
||||
// content: content.clone(),
|
||||
// }),
|
||||
// );
|
||||
|
||||
// dir.upsert(file_path, content);
|
||||
// dir
|
||||
// } else {
|
||||
// // Check if the AUX_FILES_KEY is initialized
|
||||
// match self.get(AUX_FILES_KEY, ctx).await {
|
||||
// Ok(dir_bytes) => {
|
||||
// let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
|
||||
// // Key is already set, we may append a delta
|
||||
// self.put(
|
||||
// AUX_FILES_KEY,
|
||||
// Value::WalRecord(NeonWalRecord::AuxFile {
|
||||
// file_path: file_path.clone(),
|
||||
// content: content.clone(),
|
||||
// }),
|
||||
// );
|
||||
// dir.upsert(file_path, content);
|
||||
// dir
|
||||
// }
|
||||
// Err(
|
||||
// e @ (PageReconstructError::AncestorStopping(_)
|
||||
// | PageReconstructError::Cancelled
|
||||
// | PageReconstructError::AncestorLsnTimeout(_)),
|
||||
// ) => {
|
||||
// // Important that we do not interpret a shutdown error as "not found" and thereby
|
||||
// // reset the map.
|
||||
// return Err(e.into());
|
||||
// }
|
||||
// // FIXME: PageReconstructError doesn't have an explicit variant for key-not-found, so
|
||||
// // we are assuming that all _other_ possible errors represents a missing key. If some
|
||||
// // other error occurs, we may incorrectly reset the map of aux files.
|
||||
// Err(PageReconstructError::Other(_) | PageReconstructError::WalRedo(_)) => {
|
||||
// // Key is missing, we must insert an image as the basis for subsequent deltas.
|
||||
|
||||
// let mut dir = AuxFilesDirectory {
|
||||
// files: HashMap::new(),
|
||||
// };
|
||||
// dir.upsert(file_path, content);
|
||||
// self.put(
|
||||
// AUX_FILES_KEY,
|
||||
// Value::Image(Bytes::from(
|
||||
// AuxFilesDirectory::ser(&dir).context("serialize")?,
|
||||
// )),
|
||||
// );
|
||||
// dir
|
||||
// }
|
||||
// }
|
||||
// };
|
||||
|
||||
// self.pending_directory_entries
|
||||
// .push((DirectoryKind::AuxFiles, dir.files.len()));
|
||||
// self.pending_aux_files = Some(dir);
|
||||
|
||||
// Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
|
||||
@@ -28,7 +28,6 @@ use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
|
||||
|
||||
use anyhow::{bail, Context, Result};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use hex::FromHex;
|
||||
use tracing::*;
|
||||
use utils::failpoint_support;
|
||||
|
||||
@@ -357,13 +356,7 @@ impl WalIngest {
|
||||
// a particular string, for example, but this is enough for now.
|
||||
failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep");
|
||||
} else if let Some(path) = prefix.strip_prefix("neon-file:") {
|
||||
// one off hack: disable aux file ingestion for this tenant
|
||||
if modification.tline.tenant_shard_id.tenant_id
|
||||
!= utils::id::TenantId::from_hex("94dfb564f25964764b19763f337730ce")
|
||||
.unwrap()
|
||||
{
|
||||
modification.put_file(path, message, ctx).await?;
|
||||
}
|
||||
modification.put_file(path, message, ctx).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user