Merge branch 'fast-sort' of github.com:neondatabase/neon into fast-sort

This commit is contained in:
Bojan Serafimov
2023-11-03 14:44:58 -04:00
7 changed files with 305 additions and 183 deletions

View File

@@ -60,8 +60,8 @@ impl<T> OnceCell<T> {
/// Initialization is panic-safe and cancellation-safe.
pub async fn get_or_init<F, Fut, E>(&self, factory: F) -> Result<Guard<'_, T>, E>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
F: FnOnce(InitPermit) -> Fut,
Fut: std::future::Future<Output = Result<(T, InitPermit), E>>,
{
let sem = {
let guard = self.inner.lock().unwrap();
@@ -72,28 +72,55 @@ impl<T> OnceCell<T> {
};
let permit = sem.acquire_owned().await;
if permit.is_err() {
let guard = self.inner.lock().unwrap();
assert!(
guard.value.is_some(),
"semaphore got closed, must be initialized"
);
return Ok(Guard(guard));
} else {
// now we try
let value = factory().await?;
let mut guard = self.inner.lock().unwrap();
assert!(
guard.value.is_none(),
"we won permit, must not be initialized"
);
guard.value = Some(value);
guard.init_semaphore.close();
Ok(Guard(guard))
match permit {
Ok(permit) => {
let permit = InitPermit(permit);
let (value, _permit) = factory(permit).await?;
let guard = self.inner.lock().unwrap();
Ok(Self::set0(value, guard))
}
Err(_closed) => {
let guard = self.inner.lock().unwrap();
assert!(
guard.value.is_some(),
"semaphore got closed, must be initialized"
);
return Ok(Guard(guard));
}
}
}
/// Assuming a permit is held after previous call to [`Guard::take_and_deinit`], it can be used
/// to complete initializing the inner value.
///
/// # Panics
///
/// If the inner has already been initialized.
pub fn set(&self, value: T, _permit: InitPermit) -> Guard<'_, T> {
// cannot assert that this permit is for self.inner.semaphore
let guard = self.inner.lock().unwrap();
if guard.init_semaphore.try_acquire().is_ok() {
drop(guard);
panic!("semaphore is of wrong origin");
}
Self::set0(value, guard)
}
fn set0(value: T, mut guard: std::sync::MutexGuard<'_, Inner<T>>) -> Guard<'_, T> {
if guard.value.is_some() {
drop(guard);
unreachable!("we won permit, must not be initialized");
}
guard.value = Some(value);
guard.init_semaphore.close();
Guard(guard)
}
/// Returns a guard to an existing initialized value, if any.
pub fn get(&self) -> Option<Guard<'_, T>> {
let guard = self.inner.lock().unwrap();
@@ -135,7 +162,7 @@ impl<'a, T> Guard<'a, T> {
///
/// The permit will be on a semaphore part of the new internal value, and any following
/// [`OnceCell::get_or_init`] will wait on it to complete.
pub fn take_and_deinit(&mut self) -> (T, tokio::sync::OwnedSemaphorePermit) {
pub fn take_and_deinit(&mut self) -> (T, InitPermit) {
let mut swapped = Inner::default();
let permit = swapped
.init_semaphore
@@ -145,11 +172,14 @@ impl<'a, T> Guard<'a, T> {
std::mem::swap(&mut *self.0, &mut swapped);
swapped
.value
.map(|v| (v, permit))
.map(|v| (v, InitPermit(permit)))
.expect("guard is not created unless value has been initialized")
}
}
/// Type held by OnceCell (de)initializing task.
pub struct InitPermit(tokio::sync::OwnedSemaphorePermit);
#[cfg(test)]
mod tests {
use super::*;
@@ -185,11 +215,11 @@ mod tests {
barrier.wait().await;
let won = {
let g = cell
.get_or_init(|| {
.get_or_init(|permit| {
counters.factory_got_to_run.fetch_add(1, Ordering::Relaxed);
async {
counters.future_polled.fetch_add(1, Ordering::Relaxed);
Ok::<_, Infallible>(i)
Ok::<_, Infallible>((i, permit))
}
})
.await
@@ -243,7 +273,7 @@ mod tests {
deinitialization_started.wait().await;
let started_at = tokio::time::Instant::now();
cell.get_or_init(|| async { Ok::<_, Infallible>(reinit) })
cell.get_or_init(|permit| async { Ok::<_, Infallible>((reinit, permit)) })
.await
.unwrap();
@@ -258,18 +288,32 @@ mod tests {
assert_eq!(*cell.get().unwrap(), reinit);
}
#[test]
fn reinit_with_deinit_permit() {
let cell = Arc::new(OnceCell::new(42));
let (mol, permit) = cell.get().unwrap().take_and_deinit();
cell.set(5, permit);
assert_eq!(*cell.get().unwrap(), 5);
let (five, permit) = cell.get().unwrap().take_and_deinit();
assert_eq!(5, five);
cell.set(mol, permit);
assert_eq!(*cell.get().unwrap(), 42);
}
#[tokio::test]
async fn initialization_attemptable_until_ok() {
let cell = OnceCell::default();
for _ in 0..10 {
cell.get_or_init(|| async { Err("whatever error") })
cell.get_or_init(|_permit| async { Err("whatever error") })
.await
.unwrap_err();
}
let g = cell
.get_or_init(|| async { Ok::<_, Infallible>("finally success") })
.get_or_init(|permit| async { Ok::<_, Infallible>(("finally success", permit)) })
.await
.unwrap();
assert_eq!(*g, "finally success");
@@ -281,11 +325,11 @@ mod tests {
let barrier = tokio::sync::Barrier::new(2);
let initializer = cell.get_or_init(|| async {
let initializer = cell.get_or_init(|permit| async {
barrier.wait().await;
futures::future::pending::<()>().await;
Ok::<_, Infallible>("never reached")
Ok::<_, Infallible>(("never reached", permit))
});
tokio::select! {
@@ -298,7 +342,7 @@ mod tests {
assert!(cell.get().is_none());
let g = cell
.get_or_init(|| async { Ok::<_, Infallible>("now initialized") })
.get_or_init(|permit| async { Ok::<_, Infallible>(("now initialized", permit)) })
.await
.unwrap();
assert_eq!(*g, "now initialized");

View File

@@ -10,6 +10,7 @@ use crate::control_plane_client::ControlPlaneGenerationsApi;
use crate::metrics;
use crate::tenant::remote_timeline_client::remote_layer_path;
use crate::tenant::remote_timeline_client::remote_timeline_path;
use crate::virtual_file::MaybeFatalIo;
use crate::virtual_file::VirtualFile;
use anyhow::Context;
use camino::Utf8PathBuf;
@@ -271,7 +272,9 @@ impl DeletionHeader {
let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX);
VirtualFile::crashsafe_overwrite(&header_path, &temp_path, &header_bytes)
.await
.map_err(Into::into)
.maybe_fatal_err("save deletion header")?;
Ok(())
}
}
@@ -360,6 +363,7 @@ impl DeletionList {
let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list");
VirtualFile::crashsafe_overwrite(&path, &temp_path, &bytes)
.await
.maybe_fatal_err("save deletion list")
.map_err(Into::into)
}
}

View File

@@ -34,6 +34,8 @@ use crate::deletion_queue::TEMP_SUFFIX;
use crate::metrics;
use crate::tenant::remote_timeline_client::remote_layer_path;
use crate::tenant::storage_layer::LayerFileName;
use crate::virtual_file::on_fatal_io_error;
use crate::virtual_file::MaybeFatalIo;
// The number of keys in a DeletionList before we will proactively persist it
// (without reaching a flush deadline). This aims to deliver objects of the order
@@ -195,7 +197,7 @@ impl ListWriter {
debug!("Deletion header {header_path} not found, first start?");
Ok(None)
} else {
Err(anyhow::anyhow!(e))
on_fatal_io_error(&e, "reading deletion header");
}
}
}
@@ -216,16 +218,9 @@ impl ListWriter {
self.pending.sequence = validated_sequence + 1;
let deletion_directory = self.conf.deletion_prefix();
let mut dir = match tokio::fs::read_dir(&deletion_directory).await {
Ok(d) => d,
Err(e) => {
warn!("Failed to open deletion list directory {deletion_directory}: {e:#}");
// Give up: if we can't read the deletion list directory, we probably can't
// write lists into it later, so the queue won't work.
return Err(e.into());
}
};
let mut dir = tokio::fs::read_dir(&deletion_directory)
.await
.fatal_err("read deletion directory");
let list_name_pattern =
Regex::new("(?<sequence>[a-zA-Z0-9]{16})-(?<version>[a-zA-Z0-9]{2}).list").unwrap();
@@ -233,7 +228,7 @@ impl ListWriter {
let temp_extension = format!(".{TEMP_SUFFIX}");
let header_path = self.conf.deletion_header_path();
let mut seqs: Vec<u64> = Vec::new();
while let Some(dentry) = dir.next_entry().await? {
while let Some(dentry) = dir.next_entry().await.fatal_err("read deletion dentry") {
let file_name = dentry.file_name();
let dentry_str = file_name.to_string_lossy();
@@ -246,11 +241,9 @@ impl ListWriter {
info!("Cleaning up temporary file {dentry_str}");
let absolute_path =
deletion_directory.join(dentry.file_name().to_str().expect("non-Unicode path"));
if let Err(e) = tokio::fs::remove_file(&absolute_path).await {
// Non-fatal error: we will just leave the file behind but not
// try and load it.
warn!("Failed to clean up temporary file {absolute_path}: {e:#}");
}
tokio::fs::remove_file(&absolute_path)
.await
.fatal_err("delete temp file");
continue;
}
@@ -290,7 +283,9 @@ impl ListWriter {
for s in seqs {
let list_path = self.conf.deletion_list_path(s);
let list_bytes = tokio::fs::read(&list_path).await?;
let list_bytes = tokio::fs::read(&list_path)
.await
.fatal_err("read deletion list");
let mut deletion_list = match serde_json::from_slice::<DeletionList>(&list_bytes) {
Ok(l) => l,

View File

@@ -28,6 +28,7 @@ use crate::config::PageServerConf;
use crate::control_plane_client::ControlPlaneGenerationsApi;
use crate::control_plane_client::RetryForeverError;
use crate::metrics;
use crate::virtual_file::MaybeFatalIo;
use super::deleter::DeleterMessage;
use super::DeletionHeader;
@@ -287,16 +288,9 @@ where
async fn cleanup_lists(&mut self, list_paths: Vec<Utf8PathBuf>) {
for list_path in list_paths {
debug!("Removing deletion list {list_path}");
if let Err(e) = tokio::fs::remove_file(&list_path).await {
// Unexpected: we should have permissions and nothing else should
// be touching these files. We will leave the file behind. Subsequent
// pageservers will try and load it again: hopefully whatever storage
// issue (probably permissions) has been fixed by then.
tracing::error!("Failed to delete {list_path}: {e:#}");
metrics::DELETION_QUEUE.unexpected_errors.inc();
break;
}
tokio::fs::remove_file(&list_path)
.await
.fatal_err("remove deletion list");
}
}

View File

@@ -337,31 +337,39 @@ enum ResidentOrWantedEvicted {
}
impl ResidentOrWantedEvicted {
fn get(&self) -> Option<Arc<DownloadedLayer>> {
fn get_and_upgrade(&mut self) -> Option<(Arc<DownloadedLayer>, bool)> {
match self {
ResidentOrWantedEvicted::Resident(strong) => Some(strong.clone()),
ResidentOrWantedEvicted::Resident(strong) => Some((strong.clone(), false)),
ResidentOrWantedEvicted::WantedEvicted(weak, _) => match weak.upgrade() {
Some(strong) => {
LAYER_IMPL_METRICS.inc_raced_wanted_evicted_accesses();
Some(strong)
*self = ResidentOrWantedEvicted::Resident(strong.clone());
Some((strong, true))
}
None => None,
},
}
}
/// When eviction is first requested, drop down to holding a [`Weak`].
///
/// Returns `true` if this was the first time eviction was requested.
fn downgrade(&mut self) -> bool {
/// Returns `Some` if this was the first time eviction was requested. Care should be taken to
/// drop the possibly last strong reference outside of the mutex of
/// heavier_once_cell::OnceCell.
fn downgrade(&mut self) -> Option<Arc<DownloadedLayer>> {
match self {
ResidentOrWantedEvicted::Resident(strong) => {
let weak = Arc::downgrade(strong);
*self = ResidentOrWantedEvicted::WantedEvicted(weak, strong.version);
// returning the weak is not useful, because the drop could had already ran with
// the replacement above, and that will take care of cleaning the Option we are in
true
let mut temp = ResidentOrWantedEvicted::WantedEvicted(weak, strong.version);
std::mem::swap(self, &mut temp);
match temp {
ResidentOrWantedEvicted::Resident(strong) => Some(strong),
ResidentOrWantedEvicted::WantedEvicted(..) => unreachable!("just swapped"),
}
}
ResidentOrWantedEvicted::WantedEvicted(..) => false,
ResidentOrWantedEvicted::WantedEvicted(..) => None,
}
}
}
@@ -563,20 +571,22 @@ impl LayerInner {
let mut rx = self.status.subscribe();
let res =
self.wanted_evicted
.compare_exchange(false, true, Ordering::Release, Ordering::Relaxed);
let strong = {
match self.inner.get() {
Some(mut either) => {
self.wanted_evicted.store(true, Ordering::Relaxed);
either.downgrade()
}
None => return Err(EvictionError::NotFound),
}
};
if res.is_ok() {
if strong.is_some() {
// drop the DownloadedLayer outside of the holding the guard
drop(strong);
LAYER_IMPL_METRICS.inc_started_evictions();
}
if self.get().is_none() {
// it was not evictable in the first place
// our store to the wanted_evicted does not matter; it will be reset by next download
return Err(EvictionError::NotFound);
}
match rx.recv().await {
Ok(Status::Evicted) => Ok(()),
Ok(Status::Downloaded) => Err(EvictionError::Downloaded),
@@ -590,7 +600,8 @@ impl LayerInner {
//
// use however late (compared to the initial expressing of wanted) as the
// "outcome" now
match self.get() {
LAYER_IMPL_METRICS.inc_broadcast_lagged();
match self.inner.get() {
Some(_) => Err(EvictionError::Downloaded),
None => Ok(()),
}
@@ -605,8 +616,10 @@ impl LayerInner {
allow_download: bool,
ctx: Option<&RequestContext>,
) -> Result<Arc<DownloadedLayer>, DownloadError> {
let mut init_permit = None;
loop {
let download = move || async move {
let download = move |permit| async move {
// disable any scheduled but not yet running eviction deletions for this
let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed);
@@ -627,7 +640,11 @@ impl LayerInner {
.await
.map_err(DownloadError::PreStatFailed)?;
if let Some(reason) = needs_download {
let permit = if let Some(reason) = needs_download {
if let NeedsDownload::NotFile(ft) = reason {
return Err(DownloadError::NotFile(ft));
}
// only reset this after we've decided we really need to download. otherwise it'd
// be impossible to mark cancelled downloads for eviction, like one could imagine
// we would like to do for prefetching which was not needed.
@@ -649,12 +666,14 @@ impl LayerInner {
return Err(DownloadError::DownloadRequired);
}
self.spawn_download_and_wait(timeline).await?;
self.spawn_download_and_wait(timeline, permit).await?
} else {
// the file is present locally, probably by a previous but cancelled call to
// get_or_maybe_download. alternatively we might be running without remote storage.
LAYER_IMPL_METRICS.inc_init_needed_no_download();
}
permit
};
let res = Arc::new(DownloadedLayer {
owner: Arc::downgrade(self),
@@ -667,19 +686,55 @@ impl LayerInner {
LayerResidenceEventReason::ResidenceChange,
);
Ok(ResidentOrWantedEvicted::Resident(res))
Ok((ResidentOrWantedEvicted::Resident(res), permit))
};
let locked = self.inner.get_or_init(download).await?;
if let Some(strong) = Self::get_or_apply_evictedness(Some(locked), &self.wanted_evicted)
{
if let Some(init_permit) = init_permit.take() {
// use the already held initialization permit because it is impossible to hit the
// below paths anymore essentially limiting the max loop iterations to 2.
let (value, init_permit) = download(init_permit).await?;
let mut guard = self.inner.set(value, init_permit);
let (strong, _upgraded) = guard
.get_and_upgrade()
.expect("init creates strong reference, we held the init permit");
return Ok(strong);
}
// the situation in which we might need to retry is that our init was ready
// immediatedly, but the DownloadedLayer had been dropped BUT failed to complete
// Self::evict_blocking
let (weak, permit) = {
let mut locked = self.inner.get_or_init(download).await?;
if let Some((strong, upgraded)) = locked.get_and_upgrade() {
if upgraded {
// when upgraded back, the Arc<DownloadedLayer> is still available, but
// previously a `evict_and_wait` was received.
self.wanted_evicted.store(false, Ordering::Relaxed);
// error out any `evict_and_wait`
drop(self.status.send(Status::Downloaded));
LAYER_IMPL_METRICS
.inc_eviction_cancelled(EvictionCancelled::UpgradedBackOnAccess);
}
return Ok(strong);
} else {
// path to here: the evict_blocking is stuck on spawn_blocking queue.
//
// reset the contents, deactivating the eviction and causing a
// EvictionCancelled::LostToDownload or EvictionCancelled::VersionCheckFailed.
locked.take_and_deinit()
}
};
// unlock first, then drop the weak, but because upgrade failed, we
// know it cannot be a problem.
assert!(
matches!(weak, ResidentOrWantedEvicted::WantedEvicted(..)),
"unexpected {weak:?}, ResidentOrWantedEvicted::get_and_upgrade has a bug"
);
init_permit = Some(permit);
LAYER_IMPL_METRICS.inc_retried_get_or_maybe_download();
}
}
@@ -714,10 +769,12 @@ impl LayerInner {
async fn spawn_download_and_wait(
self: &Arc<Self>,
timeline: Arc<Timeline>,
) -> Result<(), DownloadError> {
permit: heavier_once_cell::InitPermit,
) -> Result<heavier_once_cell::InitPermit, DownloadError> {
let task_name = format!("download layer {}", self);
let (tx, rx) = tokio::sync::oneshot::channel();
// this is sadly needed because of task_mgr::shutdown_tasks, otherwise we cannot
// block tenant::mgr::remove_tenant_from_memory.
@@ -751,9 +808,9 @@ impl LayerInner {
}
};
if let Err(res) = tx.send(result) {
if let Err(res) = tx.send((result, permit)) {
match res {
Ok(()) => {
(Ok(()), _) => {
// our caller is cancellation safe so this is fine; if someone
// else requests the layer, they'll find it already downloaded
// or redownload.
@@ -764,7 +821,7 @@ impl LayerInner {
tracing::info!("layer file download completed after requester had cancelled");
LAYER_IMPL_METRICS.inc_download_completed_without_requester();
},
Err(e) => {
(Err(e), _) => {
// our caller is cancellation safe, but we might be racing with
// another attempt to initialize. before we have cancellation
// token support: these attempts should converge regardless of
@@ -780,7 +837,7 @@ impl LayerInner {
.in_current_span(),
);
match rx.await {
Ok(Ok(())) => {
Ok((Ok(()), permit)) => {
if let Some(reason) = self
.needs_download()
.await
@@ -792,9 +849,10 @@ impl LayerInner {
self.consecutive_failures.store(0, Ordering::Relaxed);
Ok(())
Ok(permit)
}
Ok(Err(e)) => {
Ok((Err(e), _permit)) => {
// FIXME: this should be with the spawned task and be cancellation sensitive
let consecutive_failures =
self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
@@ -812,33 +870,6 @@ impl LayerInner {
}
}
/// Access the current state without waiting for the file to be downloaded.
///
/// Requires that we've initialized to state which is respective to the
/// actual residency state.
fn get(&self) -> Option<Arc<DownloadedLayer>> {
let locked = self.inner.get();
Self::get_or_apply_evictedness(locked, &self.wanted_evicted)
}
fn get_or_apply_evictedness(
guard: Option<heavier_once_cell::Guard<'_, ResidentOrWantedEvicted>>,
wanted_evicted: &AtomicBool,
) -> Option<Arc<DownloadedLayer>> {
if let Some(mut x) = guard {
if let Some(won) = x.get() {
// there are no guarantees that we will always get to observe a concurrent call
// to evict
if wanted_evicted.load(Ordering::Acquire) {
x.downgrade();
}
return Some(won);
}
}
None
}
async fn needs_download(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
match tokio::fs::metadata(&self.path).await {
Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
@@ -858,7 +889,7 @@ impl LayerInner {
fn is_file_present_and_good_size(&self, m: &std::fs::Metadata) -> Result<(), NeedsDownload> {
// in future, this should include sha2-256 validation of the file.
if !m.is_file() {
Err(NeedsDownload::NotFile)
Err(NeedsDownload::NotFile(m.file_type()))
} else if m.len() != self.desc.file_size {
Err(NeedsDownload::WrongSize {
actual: m.len(),
@@ -872,7 +903,9 @@ impl LayerInner {
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
let layer_file_name = self.desc.filename().file_name();
let remote = self.get().is_none();
// this is not accurate: we could have the file locally but there was a cancellation
// and now we are not in sync, or we are currently downloading it.
let remote = self.inner.get().is_none();
let access_stats = self.access_stats.as_api_model(reset);
@@ -1055,6 +1088,8 @@ enum DownloadError {
ContextAndConfigReallyDeniesDownloads,
#[error("downloading is really required but not allowed by this method")]
DownloadRequired,
#[error("layer path exists, but it is not a file: {0:?}")]
NotFile(std::fs::FileType),
/// Why no error here? Because it will be reported by page_service. We should had also done
/// retries already.
#[error("downloading evicted layer file failed")]
@@ -1070,7 +1105,7 @@ enum DownloadError {
#[derive(Debug, PartialEq)]
pub(crate) enum NeedsDownload {
NotFound,
NotFile,
NotFile(std::fs::FileType),
WrongSize { actual: u64, expected: u64 },
}
@@ -1078,7 +1113,7 @@ impl std::fmt::Display for NeedsDownload {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
NeedsDownload::NotFound => write!(f, "file was not found"),
NeedsDownload::NotFile => write!(f, "path is not a file"),
NeedsDownload::NotFile(ft) => write!(f, "path is not a file; {ft:?}"),
NeedsDownload::WrongSize { actual, expected } => {
write!(f, "file size mismatch {actual} vs. {expected}")
}
@@ -1456,6 +1491,13 @@ impl LayerImplMetrics {
.unwrap()
.inc();
}
fn inc_broadcast_lagged(&self) {
self.rare_counters
.get_metric_with_label_values(&["broadcast_lagged"])
.unwrap()
.inc();
}
}
enum EvictionCancelled {
@@ -1467,6 +1509,8 @@ enum EvictionCancelled {
AlreadyReinitialized,
/// Not evicted because of a pending reinitialization
LostToDownload,
/// After eviction, there was a new layer access which cancelled the eviction.
UpgradedBackOnAccess,
}
impl EvictionCancelled {
@@ -1479,6 +1523,7 @@ impl EvictionCancelled {
EvictionCancelled::RemoveFailed => "remove_failed",
EvictionCancelled::AlreadyReinitialized => "already_reinitialized",
EvictionCancelled::LostToDownload => "lost_to_download",
EvictionCancelled::UpgradedBackOnAccess => "upgraded_back_on_access",
}
}
}

View File

@@ -19,6 +19,7 @@ use std::io::{Error, ErrorKind, Seek, SeekFrom};
use std::os::unix::fs::FileExt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{RwLock, RwLockWriteGuard};
use utils::fs_ext;
///
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
@@ -173,37 +174,78 @@ impl OpenFiles {
}
}
#[derive(Debug, thiserror::Error)]
pub enum CrashsafeOverwriteError {
#[error("final path has no parent dir")]
FinalPathHasNoParentDir,
#[error("remove tempfile")]
RemovePreviousTempfile(#[source] std::io::Error),
#[error("create tempfile")]
CreateTempfile(#[source] std::io::Error),
#[error("write tempfile")]
WriteContents(#[source] std::io::Error),
#[error("sync tempfile")]
SyncTempfile(#[source] std::io::Error),
#[error("rename tempfile to final path")]
RenameTempfileToFinalPath(#[source] std::io::Error),
#[error("open final path parent dir")]
OpenFinalPathParentDir(#[source] std::io::Error),
#[error("sync final path parent dir")]
SyncFinalPathParentDir(#[source] std::io::Error),
/// Identify error types that should alwways terminate the process. Other
/// error types may be elegible for retry.
pub(crate) fn is_fatal_io_error(e: &std::io::Error) -> bool {
use nix::errno::Errno::*;
match e.raw_os_error().map(nix::errno::from_i32) {
Some(EIO) => {
// Terminate on EIO because we no longer trust the device to store
// data safely, or to uphold persistence guarantees on fsync.
true
}
Some(EROFS) => {
// Terminate on EROFS because a filesystem is usually remounted
// readonly when it has experienced some critical issue, so the same
// logic as EIO applies.
true
}
Some(EACCES) => {
// Terminate on EACCESS because we should always have permissions
// for our own data dir: if we don't, then we can't do our job and
// need administrative intervention to fix permissions. Terminating
// is the best way to make sure we stop cleanly rather than going
// into infinite retry loops, and will make it clear to the outside
// world that we need help.
true
}
_ => {
// Treat all other local file I/O errors are retryable. This includes:
// - ENOSPC: we stay up and wait for eviction to free some space
// - EINVAL, EBADF, EBADFD: this is a code bug, not a filesystem/hardware issue
// - WriteZero, Interrupted: these are used internally VirtualFile
false
}
}
}
impl CrashsafeOverwriteError {
/// Returns true iff the new contents are durably stored.
pub fn are_new_contents_durable(&self) -> bool {
/// Call this when the local filesystem gives us an error with an external
/// cause: this includes EIO, EROFS, and EACCESS: all these indicate either
/// bad storage or bad configuration, and we can't fix that from inside
/// a running process.
pub(crate) fn on_fatal_io_error(e: &std::io::Error, context: &str) -> ! {
tracing::error!("Fatal I/O error: {e}: {context})");
std::process::abort();
}
pub(crate) trait MaybeFatalIo<T> {
fn maybe_fatal_err(self, context: &str) -> std::io::Result<T>;
fn fatal_err(self, context: &str) -> T;
}
impl<T> MaybeFatalIo<T> for std::io::Result<T> {
/// Terminate the process if the result is an error of a fatal type, else pass it through
///
/// This is appropriate for writes, where we typically want to die on EIO/ACCES etc, but
/// not on ENOSPC.
fn maybe_fatal_err(self, context: &str) -> std::io::Result<T> {
if let Err(e) = &self {
if is_fatal_io_error(e) {
on_fatal_io_error(e, context);
}
}
self
}
/// Terminate the process on any I/O error.
///
/// This is appropriate for reads on files that we know exist: they should always work.
fn fatal_err(self, context: &str) -> T {
match self {
Self::FinalPathHasNoParentDir => false,
Self::RemovePreviousTempfile(_) => false,
Self::CreateTempfile(_) => false,
Self::WriteContents(_) => false,
Self::SyncTempfile(_) => false,
Self::RenameTempfileToFinalPath(_) => false,
Self::OpenFinalPathParentDir(_) => false,
Self::SyncFinalPathParentDir(_) => true,
Ok(v) => v,
Err(e) => {
on_fatal_io_error(&e, context);
}
}
}
}
@@ -284,15 +326,13 @@ impl VirtualFile {
final_path: &Utf8Path,
tmp_path: &Utf8Path,
content: &[u8],
) -> Result<(), CrashsafeOverwriteError> {
) -> std::io::Result<()> {
let Some(final_path_parent) = final_path.parent() else {
return Err(CrashsafeOverwriteError::FinalPathHasNoParentDir);
return Err(std::io::Error::from_raw_os_error(
nix::errno::Errno::EINVAL as i32,
));
};
match std::fs::remove_file(tmp_path) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => return Err(CrashsafeOverwriteError::RemovePreviousTempfile(e)),
}
std::fs::remove_file(tmp_path).or_else(fs_ext::ignore_not_found)?;
let mut file = Self::open_with_options(
tmp_path,
OpenOptions::new()
@@ -301,31 +341,20 @@ impl VirtualFile {
// we bail out instead of causing damage.
.create_new(true),
)
.await
.map_err(CrashsafeOverwriteError::CreateTempfile)?;
file.write_all(content)
.await
.map_err(CrashsafeOverwriteError::WriteContents)?;
file.sync_all()
.await
.map_err(CrashsafeOverwriteError::SyncTempfile)?;
.await?;
file.write_all(content).await?;
file.sync_all().await?;
drop(file); // before the rename, that's important!
// renames are atomic
std::fs::rename(tmp_path, final_path)
.map_err(CrashsafeOverwriteError::RenameTempfileToFinalPath)?;
std::fs::rename(tmp_path, final_path)?;
// Only open final path parent dirfd now, so that this operation only
// ever holds one VirtualFile fd at a time. That's important because
// the current `find_victim_slot` impl might pick the same slot for both
// VirtualFile., and it eventually does a blocking write lock instead of
// try_lock.
let final_parent_dirfd =
Self::open_with_options(final_path_parent, OpenOptions::new().read(true))
.await
.map_err(CrashsafeOverwriteError::OpenFinalPathParentDir)?;
final_parent_dirfd
.sync_all()
.await
.map_err(CrashsafeOverwriteError::SyncFinalPathParentDir)?;
Self::open_with_options(final_path_parent, OpenOptions::new().read(true)).await?;
final_parent_dirfd.sync_all().await?;
Ok(())
}

View File

@@ -1,8 +1,10 @@
from contextlib import closing
from fixtures.benchmark_fixture import MetricReport
from fixtures.compare_fixtures import NeonCompare, PgCompare
from fixtures.pageserver.utils import wait_tenant_status_404
from fixtures.pg_version import PgVersion
from fixtures.types import Lsn
#
@@ -18,6 +20,8 @@ from fixtures.pg_version import PgVersion
def test_bulk_insert(neon_with_baseline: PgCompare):
env = neon_with_baseline
start_lsn = Lsn(env.pg.safe_psql("SELECT pg_current_wal_lsn()")[0][0])
with closing(env.pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("create table huge (i int, j int);")
@@ -31,6 +35,13 @@ def test_bulk_insert(neon_with_baseline: PgCompare):
env.report_peak_memory_use()
env.report_size()
# Report amount of wal written. Useful for comparing vanilla wal format vs
# neon wal format, measuring neon write amplification, etc.
end_lsn = Lsn(env.pg.safe_psql("SELECT pg_current_wal_lsn()")[0][0])
wal_written_bytes = end_lsn - start_lsn
wal_written_mb = round(wal_written_bytes / (1024 * 1024))
env.zenbenchmark.record("wal_written", wal_written_mb, "MB", MetricReport.TEST_PARAM)
# When testing neon, also check how long it takes the pageserver to reingest the
# wal from safekeepers. If this number is close to total runtime, then the pageserver
# is the bottleneck.