diff --git a/libs/utils/src/sync/heavier_once_cell.rs b/libs/utils/src/sync/heavier_once_cell.rs index 4d66a54c98..8a5aaf2ceb 100644 --- a/libs/utils/src/sync/heavier_once_cell.rs +++ b/libs/utils/src/sync/heavier_once_cell.rs @@ -60,8 +60,8 @@ impl OnceCell { /// Initialization is panic-safe and cancellation-safe. pub async fn get_or_init(&self, factory: F) -> Result, E> where - F: FnOnce() -> Fut, - Fut: std::future::Future>, + F: FnOnce(InitPermit) -> Fut, + Fut: std::future::Future>, { let sem = { let guard = self.inner.lock().unwrap(); @@ -72,28 +72,55 @@ impl OnceCell { }; let permit = sem.acquire_owned().await; - if permit.is_err() { - let guard = self.inner.lock().unwrap(); - assert!( - guard.value.is_some(), - "semaphore got closed, must be initialized" - ); - return Ok(Guard(guard)); - } else { - // now we try - let value = factory().await?; - let mut guard = self.inner.lock().unwrap(); - assert!( - guard.value.is_none(), - "we won permit, must not be initialized" - ); - guard.value = Some(value); - guard.init_semaphore.close(); - Ok(Guard(guard)) + match permit { + Ok(permit) => { + let permit = InitPermit(permit); + let (value, _permit) = factory(permit).await?; + + let guard = self.inner.lock().unwrap(); + + Ok(Self::set0(value, guard)) + } + Err(_closed) => { + let guard = self.inner.lock().unwrap(); + assert!( + guard.value.is_some(), + "semaphore got closed, must be initialized" + ); + return Ok(Guard(guard)); + } } } + /// Assuming a permit is held after previous call to [`Guard::take_and_deinit`], it can be used + /// to complete initializing the inner value. + /// + /// # Panics + /// + /// If the inner has already been initialized. + pub fn set(&self, value: T, _permit: InitPermit) -> Guard<'_, T> { + // cannot assert that this permit is for self.inner.semaphore + let guard = self.inner.lock().unwrap(); + + if guard.init_semaphore.try_acquire().is_ok() { + drop(guard); + panic!("semaphore is of wrong origin"); + } + + Self::set0(value, guard) + } + + fn set0(value: T, mut guard: std::sync::MutexGuard<'_, Inner>) -> Guard<'_, T> { + if guard.value.is_some() { + drop(guard); + unreachable!("we won permit, must not be initialized"); + } + guard.value = Some(value); + guard.init_semaphore.close(); + Guard(guard) + } + /// Returns a guard to an existing initialized value, if any. pub fn get(&self) -> Option> { let guard = self.inner.lock().unwrap(); @@ -135,7 +162,7 @@ impl<'a, T> Guard<'a, T> { /// /// The permit will be on a semaphore part of the new internal value, and any following /// [`OnceCell::get_or_init`] will wait on it to complete. - pub fn take_and_deinit(&mut self) -> (T, tokio::sync::OwnedSemaphorePermit) { + pub fn take_and_deinit(&mut self) -> (T, InitPermit) { let mut swapped = Inner::default(); let permit = swapped .init_semaphore @@ -145,11 +172,14 @@ impl<'a, T> Guard<'a, T> { std::mem::swap(&mut *self.0, &mut swapped); swapped .value - .map(|v| (v, permit)) + .map(|v| (v, InitPermit(permit))) .expect("guard is not created unless value has been initialized") } } +/// Type held by OnceCell (de)initializing task. +pub struct InitPermit(tokio::sync::OwnedSemaphorePermit); + #[cfg(test)] mod tests { use super::*; @@ -185,11 +215,11 @@ mod tests { barrier.wait().await; let won = { let g = cell - .get_or_init(|| { + .get_or_init(|permit| { counters.factory_got_to_run.fetch_add(1, Ordering::Relaxed); async { counters.future_polled.fetch_add(1, Ordering::Relaxed); - Ok::<_, Infallible>(i) + Ok::<_, Infallible>((i, permit)) } }) .await @@ -243,7 +273,7 @@ mod tests { deinitialization_started.wait().await; let started_at = tokio::time::Instant::now(); - cell.get_or_init(|| async { Ok::<_, Infallible>(reinit) }) + cell.get_or_init(|permit| async { Ok::<_, Infallible>((reinit, permit)) }) .await .unwrap(); @@ -258,18 +288,32 @@ mod tests { assert_eq!(*cell.get().unwrap(), reinit); } + #[test] + fn reinit_with_deinit_permit() { + let cell = Arc::new(OnceCell::new(42)); + + let (mol, permit) = cell.get().unwrap().take_and_deinit(); + cell.set(5, permit); + assert_eq!(*cell.get().unwrap(), 5); + + let (five, permit) = cell.get().unwrap().take_and_deinit(); + assert_eq!(5, five); + cell.set(mol, permit); + assert_eq!(*cell.get().unwrap(), 42); + } + #[tokio::test] async fn initialization_attemptable_until_ok() { let cell = OnceCell::default(); for _ in 0..10 { - cell.get_or_init(|| async { Err("whatever error") }) + cell.get_or_init(|_permit| async { Err("whatever error") }) .await .unwrap_err(); } let g = cell - .get_or_init(|| async { Ok::<_, Infallible>("finally success") }) + .get_or_init(|permit| async { Ok::<_, Infallible>(("finally success", permit)) }) .await .unwrap(); assert_eq!(*g, "finally success"); @@ -281,11 +325,11 @@ mod tests { let barrier = tokio::sync::Barrier::new(2); - let initializer = cell.get_or_init(|| async { + let initializer = cell.get_or_init(|permit| async { barrier.wait().await; futures::future::pending::<()>().await; - Ok::<_, Infallible>("never reached") + Ok::<_, Infallible>(("never reached", permit)) }); tokio::select! { @@ -298,7 +342,7 @@ mod tests { assert!(cell.get().is_none()); let g = cell - .get_or_init(|| async { Ok::<_, Infallible>("now initialized") }) + .get_or_init(|permit| async { Ok::<_, Infallible>(("now initialized", permit)) }) .await .unwrap(); assert_eq!(*g, "now initialized"); diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 22efa23f10..7b2db929fa 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -10,6 +10,7 @@ use crate::control_plane_client::ControlPlaneGenerationsApi; use crate::metrics; use crate::tenant::remote_timeline_client::remote_layer_path; use crate::tenant::remote_timeline_client::remote_timeline_path; +use crate::virtual_file::MaybeFatalIo; use crate::virtual_file::VirtualFile; use anyhow::Context; use camino::Utf8PathBuf; @@ -271,7 +272,9 @@ impl DeletionHeader { let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX); VirtualFile::crashsafe_overwrite(&header_path, &temp_path, &header_bytes) .await - .map_err(Into::into) + .maybe_fatal_err("save deletion header")?; + + Ok(()) } } @@ -360,6 +363,7 @@ impl DeletionList { let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list"); VirtualFile::crashsafe_overwrite(&path, &temp_path, &bytes) .await + .maybe_fatal_err("save deletion list") .map_err(Into::into) } } diff --git a/pageserver/src/deletion_queue/list_writer.rs b/pageserver/src/deletion_queue/list_writer.rs index 6727957b2a..28daae2da5 100644 --- a/pageserver/src/deletion_queue/list_writer.rs +++ b/pageserver/src/deletion_queue/list_writer.rs @@ -34,6 +34,8 @@ use crate::deletion_queue::TEMP_SUFFIX; use crate::metrics; use crate::tenant::remote_timeline_client::remote_layer_path; use crate::tenant::storage_layer::LayerFileName; +use crate::virtual_file::on_fatal_io_error; +use crate::virtual_file::MaybeFatalIo; // The number of keys in a DeletionList before we will proactively persist it // (without reaching a flush deadline). This aims to deliver objects of the order @@ -195,7 +197,7 @@ impl ListWriter { debug!("Deletion header {header_path} not found, first start?"); Ok(None) } else { - Err(anyhow::anyhow!(e)) + on_fatal_io_error(&e, "reading deletion header"); } } } @@ -216,16 +218,9 @@ impl ListWriter { self.pending.sequence = validated_sequence + 1; let deletion_directory = self.conf.deletion_prefix(); - let mut dir = match tokio::fs::read_dir(&deletion_directory).await { - Ok(d) => d, - Err(e) => { - warn!("Failed to open deletion list directory {deletion_directory}: {e:#}"); - - // Give up: if we can't read the deletion list directory, we probably can't - // write lists into it later, so the queue won't work. - return Err(e.into()); - } - }; + let mut dir = tokio::fs::read_dir(&deletion_directory) + .await + .fatal_err("read deletion directory"); let list_name_pattern = Regex::new("(?[a-zA-Z0-9]{16})-(?[a-zA-Z0-9]{2}).list").unwrap(); @@ -233,7 +228,7 @@ impl ListWriter { let temp_extension = format!(".{TEMP_SUFFIX}"); let header_path = self.conf.deletion_header_path(); let mut seqs: Vec = Vec::new(); - while let Some(dentry) = dir.next_entry().await? { + while let Some(dentry) = dir.next_entry().await.fatal_err("read deletion dentry") { let file_name = dentry.file_name(); let dentry_str = file_name.to_string_lossy(); @@ -246,11 +241,9 @@ impl ListWriter { info!("Cleaning up temporary file {dentry_str}"); let absolute_path = deletion_directory.join(dentry.file_name().to_str().expect("non-Unicode path")); - if let Err(e) = tokio::fs::remove_file(&absolute_path).await { - // Non-fatal error: we will just leave the file behind but not - // try and load it. - warn!("Failed to clean up temporary file {absolute_path}: {e:#}"); - } + tokio::fs::remove_file(&absolute_path) + .await + .fatal_err("delete temp file"); continue; } @@ -290,7 +283,9 @@ impl ListWriter { for s in seqs { let list_path = self.conf.deletion_list_path(s); - let list_bytes = tokio::fs::read(&list_path).await?; + let list_bytes = tokio::fs::read(&list_path) + .await + .fatal_err("read deletion list"); let mut deletion_list = match serde_json::from_slice::(&list_bytes) { Ok(l) => l, diff --git a/pageserver/src/deletion_queue/validator.rs b/pageserver/src/deletion_queue/validator.rs index a2cbfb9dc7..72bdbdefd6 100644 --- a/pageserver/src/deletion_queue/validator.rs +++ b/pageserver/src/deletion_queue/validator.rs @@ -28,6 +28,7 @@ use crate::config::PageServerConf; use crate::control_plane_client::ControlPlaneGenerationsApi; use crate::control_plane_client::RetryForeverError; use crate::metrics; +use crate::virtual_file::MaybeFatalIo; use super::deleter::DeleterMessage; use super::DeletionHeader; @@ -287,16 +288,9 @@ where async fn cleanup_lists(&mut self, list_paths: Vec) { for list_path in list_paths { debug!("Removing deletion list {list_path}"); - - if let Err(e) = tokio::fs::remove_file(&list_path).await { - // Unexpected: we should have permissions and nothing else should - // be touching these files. We will leave the file behind. Subsequent - // pageservers will try and load it again: hopefully whatever storage - // issue (probably permissions) has been fixed by then. - tracing::error!("Failed to delete {list_path}: {e:#}"); - metrics::DELETION_QUEUE.unexpected_errors.inc(); - break; - } + tokio::fs::remove_file(&list_path) + .await + .fatal_err("remove deletion list"); } } diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index aaba9bd933..b320c02f9b 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -337,31 +337,39 @@ enum ResidentOrWantedEvicted { } impl ResidentOrWantedEvicted { - fn get(&self) -> Option> { + fn get_and_upgrade(&mut self) -> Option<(Arc, bool)> { match self { - ResidentOrWantedEvicted::Resident(strong) => Some(strong.clone()), + ResidentOrWantedEvicted::Resident(strong) => Some((strong.clone(), false)), ResidentOrWantedEvicted::WantedEvicted(weak, _) => match weak.upgrade() { Some(strong) => { LAYER_IMPL_METRICS.inc_raced_wanted_evicted_accesses(); - Some(strong) + + *self = ResidentOrWantedEvicted::Resident(strong.clone()); + + Some((strong, true)) } None => None, }, } } + /// When eviction is first requested, drop down to holding a [`Weak`]. /// - /// Returns `true` if this was the first time eviction was requested. - fn downgrade(&mut self) -> bool { + /// Returns `Some` if this was the first time eviction was requested. Care should be taken to + /// drop the possibly last strong reference outside of the mutex of + /// heavier_once_cell::OnceCell. + fn downgrade(&mut self) -> Option> { match self { ResidentOrWantedEvicted::Resident(strong) => { let weak = Arc::downgrade(strong); - *self = ResidentOrWantedEvicted::WantedEvicted(weak, strong.version); - // returning the weak is not useful, because the drop could had already ran with - // the replacement above, and that will take care of cleaning the Option we are in - true + let mut temp = ResidentOrWantedEvicted::WantedEvicted(weak, strong.version); + std::mem::swap(self, &mut temp); + match temp { + ResidentOrWantedEvicted::Resident(strong) => Some(strong), + ResidentOrWantedEvicted::WantedEvicted(..) => unreachable!("just swapped"), + } } - ResidentOrWantedEvicted::WantedEvicted(..) => false, + ResidentOrWantedEvicted::WantedEvicted(..) => None, } } } @@ -563,20 +571,22 @@ impl LayerInner { let mut rx = self.status.subscribe(); - let res = - self.wanted_evicted - .compare_exchange(false, true, Ordering::Release, Ordering::Relaxed); + let strong = { + match self.inner.get() { + Some(mut either) => { + self.wanted_evicted.store(true, Ordering::Relaxed); + either.downgrade() + } + None => return Err(EvictionError::NotFound), + } + }; - if res.is_ok() { + if strong.is_some() { + // drop the DownloadedLayer outside of the holding the guard + drop(strong); LAYER_IMPL_METRICS.inc_started_evictions(); } - if self.get().is_none() { - // it was not evictable in the first place - // our store to the wanted_evicted does not matter; it will be reset by next download - return Err(EvictionError::NotFound); - } - match rx.recv().await { Ok(Status::Evicted) => Ok(()), Ok(Status::Downloaded) => Err(EvictionError::Downloaded), @@ -590,7 +600,8 @@ impl LayerInner { // // use however late (compared to the initial expressing of wanted) as the // "outcome" now - match self.get() { + LAYER_IMPL_METRICS.inc_broadcast_lagged(); + match self.inner.get() { Some(_) => Err(EvictionError::Downloaded), None => Ok(()), } @@ -605,8 +616,10 @@ impl LayerInner { allow_download: bool, ctx: Option<&RequestContext>, ) -> Result, DownloadError> { + let mut init_permit = None; + loop { - let download = move || async move { + let download = move |permit| async move { // disable any scheduled but not yet running eviction deletions for this let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed); @@ -627,7 +640,11 @@ impl LayerInner { .await .map_err(DownloadError::PreStatFailed)?; - if let Some(reason) = needs_download { + let permit = if let Some(reason) = needs_download { + if let NeedsDownload::NotFile(ft) = reason { + return Err(DownloadError::NotFile(ft)); + } + // only reset this after we've decided we really need to download. otherwise it'd // be impossible to mark cancelled downloads for eviction, like one could imagine // we would like to do for prefetching which was not needed. @@ -649,12 +666,14 @@ impl LayerInner { return Err(DownloadError::DownloadRequired); } - self.spawn_download_and_wait(timeline).await?; + self.spawn_download_and_wait(timeline, permit).await? } else { // the file is present locally, probably by a previous but cancelled call to // get_or_maybe_download. alternatively we might be running without remote storage. LAYER_IMPL_METRICS.inc_init_needed_no_download(); - } + + permit + }; let res = Arc::new(DownloadedLayer { owner: Arc::downgrade(self), @@ -667,19 +686,55 @@ impl LayerInner { LayerResidenceEventReason::ResidenceChange, ); - Ok(ResidentOrWantedEvicted::Resident(res)) + Ok((ResidentOrWantedEvicted::Resident(res), permit)) }; - let locked = self.inner.get_or_init(download).await?; - - if let Some(strong) = Self::get_or_apply_evictedness(Some(locked), &self.wanted_evicted) - { + if let Some(init_permit) = init_permit.take() { + // use the already held initialization permit because it is impossible to hit the + // below paths anymore essentially limiting the max loop iterations to 2. + let (value, init_permit) = download(init_permit).await?; + let mut guard = self.inner.set(value, init_permit); + let (strong, _upgraded) = guard + .get_and_upgrade() + .expect("init creates strong reference, we held the init permit"); return Ok(strong); } - // the situation in which we might need to retry is that our init was ready - // immediatedly, but the DownloadedLayer had been dropped BUT failed to complete - // Self::evict_blocking + let (weak, permit) = { + let mut locked = self.inner.get_or_init(download).await?; + + if let Some((strong, upgraded)) = locked.get_and_upgrade() { + if upgraded { + // when upgraded back, the Arc is still available, but + // previously a `evict_and_wait` was received. + self.wanted_evicted.store(false, Ordering::Relaxed); + + // error out any `evict_and_wait` + drop(self.status.send(Status::Downloaded)); + LAYER_IMPL_METRICS + .inc_eviction_cancelled(EvictionCancelled::UpgradedBackOnAccess); + } + + return Ok(strong); + } else { + // path to here: the evict_blocking is stuck on spawn_blocking queue. + // + // reset the contents, deactivating the eviction and causing a + // EvictionCancelled::LostToDownload or EvictionCancelled::VersionCheckFailed. + locked.take_and_deinit() + } + }; + + // unlock first, then drop the weak, but because upgrade failed, we + // know it cannot be a problem. + + assert!( + matches!(weak, ResidentOrWantedEvicted::WantedEvicted(..)), + "unexpected {weak:?}, ResidentOrWantedEvicted::get_and_upgrade has a bug" + ); + + init_permit = Some(permit); + LAYER_IMPL_METRICS.inc_retried_get_or_maybe_download(); } } @@ -714,10 +769,12 @@ impl LayerInner { async fn spawn_download_and_wait( self: &Arc, timeline: Arc, - ) -> Result<(), DownloadError> { + permit: heavier_once_cell::InitPermit, + ) -> Result { let task_name = format!("download layer {}", self); let (tx, rx) = tokio::sync::oneshot::channel(); + // this is sadly needed because of task_mgr::shutdown_tasks, otherwise we cannot // block tenant::mgr::remove_tenant_from_memory. @@ -751,9 +808,9 @@ impl LayerInner { } }; - if let Err(res) = tx.send(result) { + if let Err(res) = tx.send((result, permit)) { match res { - Ok(()) => { + (Ok(()), _) => { // our caller is cancellation safe so this is fine; if someone // else requests the layer, they'll find it already downloaded // or redownload. @@ -764,7 +821,7 @@ impl LayerInner { tracing::info!("layer file download completed after requester had cancelled"); LAYER_IMPL_METRICS.inc_download_completed_without_requester(); }, - Err(e) => { + (Err(e), _) => { // our caller is cancellation safe, but we might be racing with // another attempt to initialize. before we have cancellation // token support: these attempts should converge regardless of @@ -780,7 +837,7 @@ impl LayerInner { .in_current_span(), ); match rx.await { - Ok(Ok(())) => { + Ok((Ok(()), permit)) => { if let Some(reason) = self .needs_download() .await @@ -792,9 +849,10 @@ impl LayerInner { self.consecutive_failures.store(0, Ordering::Relaxed); - Ok(()) + Ok(permit) } - Ok(Err(e)) => { + Ok((Err(e), _permit)) => { + // FIXME: this should be with the spawned task and be cancellation sensitive let consecutive_failures = self.consecutive_failures.fetch_add(1, Ordering::Relaxed); tracing::error!(consecutive_failures, "layer file download failed: {e:#}"); @@ -812,33 +870,6 @@ impl LayerInner { } } - /// Access the current state without waiting for the file to be downloaded. - /// - /// Requires that we've initialized to state which is respective to the - /// actual residency state. - fn get(&self) -> Option> { - let locked = self.inner.get(); - Self::get_or_apply_evictedness(locked, &self.wanted_evicted) - } - - fn get_or_apply_evictedness( - guard: Option>, - wanted_evicted: &AtomicBool, - ) -> Option> { - if let Some(mut x) = guard { - if let Some(won) = x.get() { - // there are no guarantees that we will always get to observe a concurrent call - // to evict - if wanted_evicted.load(Ordering::Acquire) { - x.downgrade(); - } - return Some(won); - } - } - - None - } - async fn needs_download(&self) -> Result, std::io::Error> { match tokio::fs::metadata(&self.path).await { Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()), @@ -858,7 +889,7 @@ impl LayerInner { fn is_file_present_and_good_size(&self, m: &std::fs::Metadata) -> Result<(), NeedsDownload> { // in future, this should include sha2-256 validation of the file. if !m.is_file() { - Err(NeedsDownload::NotFile) + Err(NeedsDownload::NotFile(m.file_type())) } else if m.len() != self.desc.file_size { Err(NeedsDownload::WrongSize { actual: m.len(), @@ -872,7 +903,9 @@ impl LayerInner { fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo { let layer_file_name = self.desc.filename().file_name(); - let remote = self.get().is_none(); + // this is not accurate: we could have the file locally but there was a cancellation + // and now we are not in sync, or we are currently downloading it. + let remote = self.inner.get().is_none(); let access_stats = self.access_stats.as_api_model(reset); @@ -1055,6 +1088,8 @@ enum DownloadError { ContextAndConfigReallyDeniesDownloads, #[error("downloading is really required but not allowed by this method")] DownloadRequired, + #[error("layer path exists, but it is not a file: {0:?}")] + NotFile(std::fs::FileType), /// Why no error here? Because it will be reported by page_service. We should had also done /// retries already. #[error("downloading evicted layer file failed")] @@ -1070,7 +1105,7 @@ enum DownloadError { #[derive(Debug, PartialEq)] pub(crate) enum NeedsDownload { NotFound, - NotFile, + NotFile(std::fs::FileType), WrongSize { actual: u64, expected: u64 }, } @@ -1078,7 +1113,7 @@ impl std::fmt::Display for NeedsDownload { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { NeedsDownload::NotFound => write!(f, "file was not found"), - NeedsDownload::NotFile => write!(f, "path is not a file"), + NeedsDownload::NotFile(ft) => write!(f, "path is not a file; {ft:?}"), NeedsDownload::WrongSize { actual, expected } => { write!(f, "file size mismatch {actual} vs. {expected}") } @@ -1456,6 +1491,13 @@ impl LayerImplMetrics { .unwrap() .inc(); } + + fn inc_broadcast_lagged(&self) { + self.rare_counters + .get_metric_with_label_values(&["broadcast_lagged"]) + .unwrap() + .inc(); + } } enum EvictionCancelled { @@ -1467,6 +1509,8 @@ enum EvictionCancelled { AlreadyReinitialized, /// Not evicted because of a pending reinitialization LostToDownload, + /// After eviction, there was a new layer access which cancelled the eviction. + UpgradedBackOnAccess, } impl EvictionCancelled { @@ -1479,6 +1523,7 @@ impl EvictionCancelled { EvictionCancelled::RemoveFailed => "remove_failed", EvictionCancelled::AlreadyReinitialized => "already_reinitialized", EvictionCancelled::LostToDownload => "lost_to_download", + EvictionCancelled::UpgradedBackOnAccess => "upgraded_back_on_access", } } } diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index a2e8f30e15..b58b883ab6 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -19,6 +19,7 @@ use std::io::{Error, ErrorKind, Seek, SeekFrom}; use std::os::unix::fs::FileExt; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{RwLock, RwLockWriteGuard}; +use utils::fs_ext; /// /// A virtual file descriptor. You can use this just like std::fs::File, but internally @@ -173,37 +174,78 @@ impl OpenFiles { } } -#[derive(Debug, thiserror::Error)] -pub enum CrashsafeOverwriteError { - #[error("final path has no parent dir")] - FinalPathHasNoParentDir, - #[error("remove tempfile")] - RemovePreviousTempfile(#[source] std::io::Error), - #[error("create tempfile")] - CreateTempfile(#[source] std::io::Error), - #[error("write tempfile")] - WriteContents(#[source] std::io::Error), - #[error("sync tempfile")] - SyncTempfile(#[source] std::io::Error), - #[error("rename tempfile to final path")] - RenameTempfileToFinalPath(#[source] std::io::Error), - #[error("open final path parent dir")] - OpenFinalPathParentDir(#[source] std::io::Error), - #[error("sync final path parent dir")] - SyncFinalPathParentDir(#[source] std::io::Error), +/// Identify error types that should alwways terminate the process. Other +/// error types may be elegible for retry. +pub(crate) fn is_fatal_io_error(e: &std::io::Error) -> bool { + use nix::errno::Errno::*; + match e.raw_os_error().map(nix::errno::from_i32) { + Some(EIO) => { + // Terminate on EIO because we no longer trust the device to store + // data safely, or to uphold persistence guarantees on fsync. + true + } + Some(EROFS) => { + // Terminate on EROFS because a filesystem is usually remounted + // readonly when it has experienced some critical issue, so the same + // logic as EIO applies. + true + } + Some(EACCES) => { + // Terminate on EACCESS because we should always have permissions + // for our own data dir: if we don't, then we can't do our job and + // need administrative intervention to fix permissions. Terminating + // is the best way to make sure we stop cleanly rather than going + // into infinite retry loops, and will make it clear to the outside + // world that we need help. + true + } + _ => { + // Treat all other local file I/O errors are retryable. This includes: + // - ENOSPC: we stay up and wait for eviction to free some space + // - EINVAL, EBADF, EBADFD: this is a code bug, not a filesystem/hardware issue + // - WriteZero, Interrupted: these are used internally VirtualFile + false + } + } } -impl CrashsafeOverwriteError { - /// Returns true iff the new contents are durably stored. - pub fn are_new_contents_durable(&self) -> bool { + +/// Call this when the local filesystem gives us an error with an external +/// cause: this includes EIO, EROFS, and EACCESS: all these indicate either +/// bad storage or bad configuration, and we can't fix that from inside +/// a running process. +pub(crate) fn on_fatal_io_error(e: &std::io::Error, context: &str) -> ! { + tracing::error!("Fatal I/O error: {e}: {context})"); + std::process::abort(); +} + +pub(crate) trait MaybeFatalIo { + fn maybe_fatal_err(self, context: &str) -> std::io::Result; + fn fatal_err(self, context: &str) -> T; +} + +impl MaybeFatalIo for std::io::Result { + /// Terminate the process if the result is an error of a fatal type, else pass it through + /// + /// This is appropriate for writes, where we typically want to die on EIO/ACCES etc, but + /// not on ENOSPC. + fn maybe_fatal_err(self, context: &str) -> std::io::Result { + if let Err(e) = &self { + if is_fatal_io_error(e) { + on_fatal_io_error(e, context); + } + } + self + } + + /// Terminate the process on any I/O error. + /// + /// This is appropriate for reads on files that we know exist: they should always work. + fn fatal_err(self, context: &str) -> T { match self { - Self::FinalPathHasNoParentDir => false, - Self::RemovePreviousTempfile(_) => false, - Self::CreateTempfile(_) => false, - Self::WriteContents(_) => false, - Self::SyncTempfile(_) => false, - Self::RenameTempfileToFinalPath(_) => false, - Self::OpenFinalPathParentDir(_) => false, - Self::SyncFinalPathParentDir(_) => true, + Ok(v) => v, + Err(e) => { + on_fatal_io_error(&e, context); + } } } } @@ -284,15 +326,13 @@ impl VirtualFile { final_path: &Utf8Path, tmp_path: &Utf8Path, content: &[u8], - ) -> Result<(), CrashsafeOverwriteError> { + ) -> std::io::Result<()> { let Some(final_path_parent) = final_path.parent() else { - return Err(CrashsafeOverwriteError::FinalPathHasNoParentDir); + return Err(std::io::Error::from_raw_os_error( + nix::errno::Errno::EINVAL as i32, + )); }; - match std::fs::remove_file(tmp_path) { - Ok(()) => {} - Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} - Err(e) => return Err(CrashsafeOverwriteError::RemovePreviousTempfile(e)), - } + std::fs::remove_file(tmp_path).or_else(fs_ext::ignore_not_found)?; let mut file = Self::open_with_options( tmp_path, OpenOptions::new() @@ -301,31 +341,20 @@ impl VirtualFile { // we bail out instead of causing damage. .create_new(true), ) - .await - .map_err(CrashsafeOverwriteError::CreateTempfile)?; - file.write_all(content) - .await - .map_err(CrashsafeOverwriteError::WriteContents)?; - file.sync_all() - .await - .map_err(CrashsafeOverwriteError::SyncTempfile)?; + .await?; + file.write_all(content).await?; + file.sync_all().await?; drop(file); // before the rename, that's important! // renames are atomic - std::fs::rename(tmp_path, final_path) - .map_err(CrashsafeOverwriteError::RenameTempfileToFinalPath)?; + std::fs::rename(tmp_path, final_path)?; // Only open final path parent dirfd now, so that this operation only // ever holds one VirtualFile fd at a time. That's important because // the current `find_victim_slot` impl might pick the same slot for both // VirtualFile., and it eventually does a blocking write lock instead of // try_lock. let final_parent_dirfd = - Self::open_with_options(final_path_parent, OpenOptions::new().read(true)) - .await - .map_err(CrashsafeOverwriteError::OpenFinalPathParentDir)?; - final_parent_dirfd - .sync_all() - .await - .map_err(CrashsafeOverwriteError::SyncFinalPathParentDir)?; + Self::open_with_options(final_path_parent, OpenOptions::new().read(true)).await?; + final_parent_dirfd.sync_all().await?; Ok(()) } diff --git a/test_runner/performance/test_bulk_insert.py b/test_runner/performance/test_bulk_insert.py index 62301f3919..46acec0f63 100644 --- a/test_runner/performance/test_bulk_insert.py +++ b/test_runner/performance/test_bulk_insert.py @@ -1,8 +1,10 @@ from contextlib import closing +from fixtures.benchmark_fixture import MetricReport from fixtures.compare_fixtures import NeonCompare, PgCompare from fixtures.pageserver.utils import wait_tenant_status_404 from fixtures.pg_version import PgVersion +from fixtures.types import Lsn # @@ -18,6 +20,8 @@ from fixtures.pg_version import PgVersion def test_bulk_insert(neon_with_baseline: PgCompare): env = neon_with_baseline + start_lsn = Lsn(env.pg.safe_psql("SELECT pg_current_wal_lsn()")[0][0]) + with closing(env.pg.connect()) as conn: with conn.cursor() as cur: cur.execute("create table huge (i int, j int);") @@ -31,6 +35,13 @@ def test_bulk_insert(neon_with_baseline: PgCompare): env.report_peak_memory_use() env.report_size() + # Report amount of wal written. Useful for comparing vanilla wal format vs + # neon wal format, measuring neon write amplification, etc. + end_lsn = Lsn(env.pg.safe_psql("SELECT pg_current_wal_lsn()")[0][0]) + wal_written_bytes = end_lsn - start_lsn + wal_written_mb = round(wal_written_bytes / (1024 * 1024)) + env.zenbenchmark.record("wal_written", wal_written_mb, "MB", MetricReport.TEST_PARAM) + # When testing neon, also check how long it takes the pageserver to reingest the # wal from safekeepers. If this number is close to total runtime, then the pageserver # is the bottleneck.