From e82d1ad6b8f00544d20614788f0f09fb02fb096d Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Wed, 1 Nov 2023 17:38:32 +0200 Subject: [PATCH 1/6] fix(layer): reinit on access before eviction happens (#5743) Right before merging, I added a loop to `fn LayerInner::get_or_maybe_download`, which was always supposed to be there. However I had forgotten to restart initialization instead of waiting for the eviction to happen to support original design goal of "eviction should always lose to redownload (or init)". This was wrong. After this fix, if `spawn_blocking` queue is blocked on something, nothing bad will happen. Part of #5737. --- pageserver/src/tenant/storage_layer/layer.rs | 130 +++++++++++-------- 1 file changed, 75 insertions(+), 55 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index aaba9bd933..761fe311c6 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -337,31 +337,41 @@ enum ResidentOrWantedEvicted { } impl ResidentOrWantedEvicted { - fn get(&self) -> Option> { + /// If `Some` is returned, the ResidentOrWantedEvicted has been upgraded back from + /// `ResidentOrWantedEvicted::WantedEvicted` to `ResidentOrWantedEvicted::Resident`. + fn get_and_upgrade(&mut self) -> Option> { match self { ResidentOrWantedEvicted::Resident(strong) => Some(strong.clone()), ResidentOrWantedEvicted::WantedEvicted(weak, _) => match weak.upgrade() { Some(strong) => { LAYER_IMPL_METRICS.inc_raced_wanted_evicted_accesses(); + + *self = ResidentOrWantedEvicted::Resident(strong.clone()); + Some(strong) } None => None, }, } } + /// When eviction is first requested, drop down to holding a [`Weak`]. /// - /// Returns `true` if this was the first time eviction was requested. - fn downgrade(&mut self) -> bool { + /// Returns `Some` if this was the first time eviction was requested. Care should be taken to + /// drop the possibly last strong reference outside of the mutex of + /// heavier_once_cell::OnceCell. + fn downgrade(&mut self) -> Option> { match self { ResidentOrWantedEvicted::Resident(strong) => { let weak = Arc::downgrade(strong); - *self = ResidentOrWantedEvicted::WantedEvicted(weak, strong.version); - // returning the weak is not useful, because the drop could had already ran with - // the replacement above, and that will take care of cleaning the Option we are in - true + let mut temp = ResidentOrWantedEvicted::WantedEvicted(weak, strong.version); + std::mem::swap(self, &mut temp); + match temp { + ResidentOrWantedEvicted::Resident(strong) => Some(strong), + ResidentOrWantedEvicted::WantedEvicted(..) => unreachable!("just swapped"), + } } - ResidentOrWantedEvicted::WantedEvicted(..) => false, + ResidentOrWantedEvicted::WantedEvicted(..) => None, } } } @@ -563,20 +573,22 @@ impl LayerInner { let mut rx = self.status.subscribe(); - let res = - self.wanted_evicted - .compare_exchange(false, true, Ordering::Release, Ordering::Relaxed); + let strong = { + match self.inner.get() { + Some(mut either) => { + self.wanted_evicted.store(true, Ordering::Relaxed); + either.downgrade() + } + None => return Err(EvictionError::NotFound), + } + }; - if res.is_ok() { + if strong.is_some() { + // drop the DownloadedLayer outside of the holding the guard + drop(strong); LAYER_IMPL_METRICS.inc_started_evictions(); } - if self.get().is_none() { - // it was not evictable in the first place - // our store to the wanted_evicted does not matter; it will be reset by next download - return Err(EvictionError::NotFound); - } - match rx.recv().await { Ok(Status::Evicted) => Ok(()), Ok(Status::Downloaded) => Err(EvictionError::Downloaded), @@ -590,7 +602,8 @@ impl LayerInner { // // use however late (compared to the initial expressing of wanted) as the // "outcome" now - match self.get() { + LAYER_IMPL_METRICS.inc_broadcast_lagged(); + match self.inner.get() { Some(_) => Err(EvictionError::Downloaded), None => Ok(()), } @@ -605,6 +618,8 @@ impl LayerInner { allow_download: bool, ctx: Option<&RequestContext>, ) -> Result, DownloadError> { + let mut permit = None; + loop { let download = move || async move { // disable any scheduled but not yet running eviction deletions for this @@ -622,6 +637,8 @@ impl LayerInner { // check if we really need to be downloaded; could have been already downloaded by a // cancelled previous attempt. + // + // FIXME: what if it's a directory? that is currently needs_download == true let needs_download = self .needs_download() .await @@ -670,16 +687,37 @@ impl LayerInner { Ok(ResidentOrWantedEvicted::Resident(res)) }; - let locked = self.inner.get_or_init(download).await?; + let (weak, _permit) = { + // should we be able to give the permit to the `get_or_init`? would make sense. + drop(permit.take()); + let mut locked = self.inner.get_or_init(download).await?; - if let Some(strong) = Self::get_or_apply_evictedness(Some(locked), &self.wanted_evicted) - { - return Ok(strong); - } + if let Some(strong) = locked.get_and_upgrade() { + self.wanted_evicted.store(false, Ordering::Relaxed); + + // error out any `evict_and_wait` + drop(self.status.send(Status::Downloaded)); + + return Ok(strong); + } else { + // path to here: the evict_blocking is stuck on spawn_blocking queue. + // + // reset the contents, deactivating the eviction and causing a + // EvictionCancelled::LostToDownload or EvictionCancelled::VersionCheckFailed. + locked.take_and_deinit() + } + }; + + // unlock first, then drop the weak, but because upgrade failed, we + // know it cannot be a problem. + + assert!( + matches!(weak, ResidentOrWantedEvicted::WantedEvicted(..)), + "unexpected {weak:?}, ResidentOrWantedEvicted::get_and_upgrade has a bug" + ); + + permit = Some(_permit); - // the situation in which we might need to retry is that our init was ready - // immediatedly, but the DownloadedLayer had been dropped BUT failed to complete - // Self::evict_blocking LAYER_IMPL_METRICS.inc_retried_get_or_maybe_download(); } } @@ -812,33 +850,6 @@ impl LayerInner { } } - /// Access the current state without waiting for the file to be downloaded. - /// - /// Requires that we've initialized to state which is respective to the - /// actual residency state. - fn get(&self) -> Option> { - let locked = self.inner.get(); - Self::get_or_apply_evictedness(locked, &self.wanted_evicted) - } - - fn get_or_apply_evictedness( - guard: Option>, - wanted_evicted: &AtomicBool, - ) -> Option> { - if let Some(mut x) = guard { - if let Some(won) = x.get() { - // there are no guarantees that we will always get to observe a concurrent call - // to evict - if wanted_evicted.load(Ordering::Acquire) { - x.downgrade(); - } - return Some(won); - } - } - - None - } - async fn needs_download(&self) -> Result, std::io::Error> { match tokio::fs::metadata(&self.path).await { Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()), @@ -872,7 +883,9 @@ impl LayerInner { fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo { let layer_file_name = self.desc.filename().file_name(); - let remote = self.get().is_none(); + // this is not accurate: we could have the file locally but there was a cancellation + // and now we are not in sync, or we are currently downloading it. + let remote = self.inner.get().is_none(); let access_stats = self.access_stats.as_api_model(reset); @@ -1456,6 +1469,13 @@ impl LayerImplMetrics { .unwrap() .inc(); } + + fn inc_broadcast_lagged(&self) { + self.rare_counters + .get_metric_with_label_values(&["broadcast_lagged"]) + .unwrap() + .inc(); + } } enum EvictionCancelled { From 0b790b6d0058dcc47e2e56160a7f2c3b595483d9 Mon Sep 17 00:00:00 2001 From: bojanserafimov Date: Wed, 1 Nov 2023 17:02:58 -0400 Subject: [PATCH 2/6] Record wal size in import benchmark (#5755) --- test_runner/performance/test_bulk_insert.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/test_runner/performance/test_bulk_insert.py b/test_runner/performance/test_bulk_insert.py index 62301f3919..46acec0f63 100644 --- a/test_runner/performance/test_bulk_insert.py +++ b/test_runner/performance/test_bulk_insert.py @@ -1,8 +1,10 @@ from contextlib import closing +from fixtures.benchmark_fixture import MetricReport from fixtures.compare_fixtures import NeonCompare, PgCompare from fixtures.pageserver.utils import wait_tenant_status_404 from fixtures.pg_version import PgVersion +from fixtures.types import Lsn # @@ -18,6 +20,8 @@ from fixtures.pg_version import PgVersion def test_bulk_insert(neon_with_baseline: PgCompare): env = neon_with_baseline + start_lsn = Lsn(env.pg.safe_psql("SELECT pg_current_wal_lsn()")[0][0]) + with closing(env.pg.connect()) as conn: with conn.cursor() as cur: cur.execute("create table huge (i int, j int);") @@ -31,6 +35,13 @@ def test_bulk_insert(neon_with_baseline: PgCompare): env.report_peak_memory_use() env.report_size() + # Report amount of wal written. Useful for comparing vanilla wal format vs + # neon wal format, measuring neon write amplification, etc. + end_lsn = Lsn(env.pg.safe_psql("SELECT pg_current_wal_lsn()")[0][0]) + wal_written_bytes = end_lsn - start_lsn + wal_written_mb = round(wal_written_bytes / (1024 * 1024)) + env.zenbenchmark.record("wal_written", wal_written_mb, "MB", MetricReport.TEST_PARAM) + # When testing neon, also check how long it takes the pageserver to reingest the # wal from safekeepers. If this number is close to total runtime, then the pageserver # is the bottleneck. From 2dca4c03fc8d12f83398408504eee7c404870d95 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Thu, 2 Nov 2023 10:06:32 +0200 Subject: [PATCH 3/6] feat(layer): cancellable get_or_maybe_download (#5744) With the layer implementation as was done in #4938, it is possible via cancellation to cause two concurrent downloads on the same path, due to how `RemoteTimelineClient::download_remote_layer` does tempfiles. Thread the init semaphore through the spawned task of downloading to make this impossible to happen. --- libs/utils/src/sync/heavier_once_cell.rs | 104 +++++++++++++------ pageserver/src/tenant/storage_layer/layer.rs | 48 ++++++--- 2 files changed, 105 insertions(+), 47 deletions(-) diff --git a/libs/utils/src/sync/heavier_once_cell.rs b/libs/utils/src/sync/heavier_once_cell.rs index 4d66a54c98..8a5aaf2ceb 100644 --- a/libs/utils/src/sync/heavier_once_cell.rs +++ b/libs/utils/src/sync/heavier_once_cell.rs @@ -60,8 +60,8 @@ impl OnceCell { /// Initialization is panic-safe and cancellation-safe. pub async fn get_or_init(&self, factory: F) -> Result, E> where - F: FnOnce() -> Fut, - Fut: std::future::Future>, + F: FnOnce(InitPermit) -> Fut, + Fut: std::future::Future>, { let sem = { let guard = self.inner.lock().unwrap(); @@ -72,28 +72,55 @@ impl OnceCell { }; let permit = sem.acquire_owned().await; - if permit.is_err() { - let guard = self.inner.lock().unwrap(); - assert!( - guard.value.is_some(), - "semaphore got closed, must be initialized" - ); - return Ok(Guard(guard)); - } else { - // now we try - let value = factory().await?; - let mut guard = self.inner.lock().unwrap(); - assert!( - guard.value.is_none(), - "we won permit, must not be initialized" - ); - guard.value = Some(value); - guard.init_semaphore.close(); - Ok(Guard(guard)) + match permit { + Ok(permit) => { + let permit = InitPermit(permit); + let (value, _permit) = factory(permit).await?; + + let guard = self.inner.lock().unwrap(); + + Ok(Self::set0(value, guard)) + } + Err(_closed) => { + let guard = self.inner.lock().unwrap(); + assert!( + guard.value.is_some(), + "semaphore got closed, must be initialized" + ); + return Ok(Guard(guard)); + } } } + /// Assuming a permit is held after previous call to [`Guard::take_and_deinit`], it can be used + /// to complete initializing the inner value. + /// + /// # Panics + /// + /// If the inner has already been initialized. + pub fn set(&self, value: T, _permit: InitPermit) -> Guard<'_, T> { + // cannot assert that this permit is for self.inner.semaphore + let guard = self.inner.lock().unwrap(); + + if guard.init_semaphore.try_acquire().is_ok() { + drop(guard); + panic!("semaphore is of wrong origin"); + } + + Self::set0(value, guard) + } + + fn set0(value: T, mut guard: std::sync::MutexGuard<'_, Inner>) -> Guard<'_, T> { + if guard.value.is_some() { + drop(guard); + unreachable!("we won permit, must not be initialized"); + } + guard.value = Some(value); + guard.init_semaphore.close(); + Guard(guard) + } + /// Returns a guard to an existing initialized value, if any. pub fn get(&self) -> Option> { let guard = self.inner.lock().unwrap(); @@ -135,7 +162,7 @@ impl<'a, T> Guard<'a, T> { /// /// The permit will be on a semaphore part of the new internal value, and any following /// [`OnceCell::get_or_init`] will wait on it to complete. - pub fn take_and_deinit(&mut self) -> (T, tokio::sync::OwnedSemaphorePermit) { + pub fn take_and_deinit(&mut self) -> (T, InitPermit) { let mut swapped = Inner::default(); let permit = swapped .init_semaphore @@ -145,11 +172,14 @@ impl<'a, T> Guard<'a, T> { std::mem::swap(&mut *self.0, &mut swapped); swapped .value - .map(|v| (v, permit)) + .map(|v| (v, InitPermit(permit))) .expect("guard is not created unless value has been initialized") } } +/// Type held by OnceCell (de)initializing task. +pub struct InitPermit(tokio::sync::OwnedSemaphorePermit); + #[cfg(test)] mod tests { use super::*; @@ -185,11 +215,11 @@ mod tests { barrier.wait().await; let won = { let g = cell - .get_or_init(|| { + .get_or_init(|permit| { counters.factory_got_to_run.fetch_add(1, Ordering::Relaxed); async { counters.future_polled.fetch_add(1, Ordering::Relaxed); - Ok::<_, Infallible>(i) + Ok::<_, Infallible>((i, permit)) } }) .await @@ -243,7 +273,7 @@ mod tests { deinitialization_started.wait().await; let started_at = tokio::time::Instant::now(); - cell.get_or_init(|| async { Ok::<_, Infallible>(reinit) }) + cell.get_or_init(|permit| async { Ok::<_, Infallible>((reinit, permit)) }) .await .unwrap(); @@ -258,18 +288,32 @@ mod tests { assert_eq!(*cell.get().unwrap(), reinit); } + #[test] + fn reinit_with_deinit_permit() { + let cell = Arc::new(OnceCell::new(42)); + + let (mol, permit) = cell.get().unwrap().take_and_deinit(); + cell.set(5, permit); + assert_eq!(*cell.get().unwrap(), 5); + + let (five, permit) = cell.get().unwrap().take_and_deinit(); + assert_eq!(5, five); + cell.set(mol, permit); + assert_eq!(*cell.get().unwrap(), 42); + } + #[tokio::test] async fn initialization_attemptable_until_ok() { let cell = OnceCell::default(); for _ in 0..10 { - cell.get_or_init(|| async { Err("whatever error") }) + cell.get_or_init(|_permit| async { Err("whatever error") }) .await .unwrap_err(); } let g = cell - .get_or_init(|| async { Ok::<_, Infallible>("finally success") }) + .get_or_init(|permit| async { Ok::<_, Infallible>(("finally success", permit)) }) .await .unwrap(); assert_eq!(*g, "finally success"); @@ -281,11 +325,11 @@ mod tests { let barrier = tokio::sync::Barrier::new(2); - let initializer = cell.get_or_init(|| async { + let initializer = cell.get_or_init(|permit| async { barrier.wait().await; futures::future::pending::<()>().await; - Ok::<_, Infallible>("never reached") + Ok::<_, Infallible>(("never reached", permit)) }); tokio::select! { @@ -298,7 +342,7 @@ mod tests { assert!(cell.get().is_none()); let g = cell - .get_or_init(|| async { Ok::<_, Infallible>("now initialized") }) + .get_or_init(|permit| async { Ok::<_, Infallible>(("now initialized", permit)) }) .await .unwrap(); assert_eq!(*g, "now initialized"); diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 761fe311c6..94edfa6fe0 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -618,10 +618,10 @@ impl LayerInner { allow_download: bool, ctx: Option<&RequestContext>, ) -> Result, DownloadError> { - let mut permit = None; + let mut init_permit = None; loop { - let download = move || async move { + let download = move |permit| async move { // disable any scheduled but not yet running eviction deletions for this let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed); @@ -644,7 +644,7 @@ impl LayerInner { .await .map_err(DownloadError::PreStatFailed)?; - if let Some(reason) = needs_download { + let permit = if let Some(reason) = needs_download { // only reset this after we've decided we really need to download. otherwise it'd // be impossible to mark cancelled downloads for eviction, like one could imagine // we would like to do for prefetching which was not needed. @@ -666,12 +666,14 @@ impl LayerInner { return Err(DownloadError::DownloadRequired); } - self.spawn_download_and_wait(timeline).await?; + self.spawn_download_and_wait(timeline, permit).await? } else { // the file is present locally, probably by a previous but cancelled call to // get_or_maybe_download. alternatively we might be running without remote storage. LAYER_IMPL_METRICS.inc_init_needed_no_download(); - } + + permit + }; let res = Arc::new(DownloadedLayer { owner: Arc::downgrade(self), @@ -684,12 +686,21 @@ impl LayerInner { LayerResidenceEventReason::ResidenceChange, ); - Ok(ResidentOrWantedEvicted::Resident(res)) + Ok((ResidentOrWantedEvicted::Resident(res), permit)) }; - let (weak, _permit) = { - // should we be able to give the permit to the `get_or_init`? would make sense. - drop(permit.take()); + if let Some(init_permit) = init_permit.take() { + // use the already held initialization permit because it is impossible to hit the + // below paths anymore essentially limiting the max loop iterations to 2. + let (value, init_permit) = download(init_permit).await?; + let mut guard = self.inner.set(value, init_permit); + let strong = guard + .get_and_upgrade() + .expect("init creates strong reference, we held the init permit"); + return Ok(strong); + } + + let (weak, permit) = { let mut locked = self.inner.get_or_init(download).await?; if let Some(strong) = locked.get_and_upgrade() { @@ -716,7 +727,7 @@ impl LayerInner { "unexpected {weak:?}, ResidentOrWantedEvicted::get_and_upgrade has a bug" ); - permit = Some(_permit); + init_permit = Some(permit); LAYER_IMPL_METRICS.inc_retried_get_or_maybe_download(); } @@ -752,10 +763,12 @@ impl LayerInner { async fn spawn_download_and_wait( self: &Arc, timeline: Arc, - ) -> Result<(), DownloadError> { + permit: heavier_once_cell::InitPermit, + ) -> Result { let task_name = format!("download layer {}", self); let (tx, rx) = tokio::sync::oneshot::channel(); + // this is sadly needed because of task_mgr::shutdown_tasks, otherwise we cannot // block tenant::mgr::remove_tenant_from_memory. @@ -789,9 +802,9 @@ impl LayerInner { } }; - if let Err(res) = tx.send(result) { + if let Err(res) = tx.send((result, permit)) { match res { - Ok(()) => { + (Ok(()), _) => { // our caller is cancellation safe so this is fine; if someone // else requests the layer, they'll find it already downloaded // or redownload. @@ -802,7 +815,7 @@ impl LayerInner { tracing::info!("layer file download completed after requester had cancelled"); LAYER_IMPL_METRICS.inc_download_completed_without_requester(); }, - Err(e) => { + (Err(e), _) => { // our caller is cancellation safe, but we might be racing with // another attempt to initialize. before we have cancellation // token support: these attempts should converge regardless of @@ -818,7 +831,7 @@ impl LayerInner { .in_current_span(), ); match rx.await { - Ok(Ok(())) => { + Ok((Ok(()), permit)) => { if let Some(reason) = self .needs_download() .await @@ -830,9 +843,10 @@ impl LayerInner { self.consecutive_failures.store(0, Ordering::Relaxed); - Ok(()) + Ok(permit) } - Ok(Err(e)) => { + Ok((Err(e), _permit)) => { + // FIXME: this should be with the spawned task and be cancellation sensitive let consecutive_failures = self.consecutive_failures.fetch_add(1, Ordering::Relaxed); tracing::error!(consecutive_failures, "layer file download failed: {e:#}"); From 5650138532e5e3ca4a06f15f3ed79d6a3545f5a0 Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 2 Nov 2023 09:14:26 +0000 Subject: [PATCH 4/6] pageserver: helpers for explicitly dying on fatal I/O errors (#5651) Following from discussion on https://github.com/neondatabase/neon/pull/5436 where hacking an implicit die-on-fatal-io behavior into an Error type was a source of disagreement -- in this PR, dying on fatal I/O errors is explicit, with `fatal_err` and `maybe_fatal_err` helpers in the `MaybeFatalIo` trait, which is implemented for std::io::Result. To enable this approach with `crashsafe_overwrite`, the return type of that function is changed to std::io::Result -- the previous error enum for this function was not used for any logic, and the utility of saying exactly which step in the function failed is outweighed by the hygiene of having an I/O funciton return an io::Result. The initial use case for these helpers is the deletion queue. --- pageserver/src/deletion_queue.rs | 6 +- pageserver/src/deletion_queue/list_writer.rs | 31 ++--- pageserver/src/deletion_queue/validator.rs | 14 +- pageserver/src/virtual_file.rs | 135 +++++++++++-------- 4 files changed, 104 insertions(+), 82 deletions(-) diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 22efa23f10..7b2db929fa 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -10,6 +10,7 @@ use crate::control_plane_client::ControlPlaneGenerationsApi; use crate::metrics; use crate::tenant::remote_timeline_client::remote_layer_path; use crate::tenant::remote_timeline_client::remote_timeline_path; +use crate::virtual_file::MaybeFatalIo; use crate::virtual_file::VirtualFile; use anyhow::Context; use camino::Utf8PathBuf; @@ -271,7 +272,9 @@ impl DeletionHeader { let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX); VirtualFile::crashsafe_overwrite(&header_path, &temp_path, &header_bytes) .await - .map_err(Into::into) + .maybe_fatal_err("save deletion header")?; + + Ok(()) } } @@ -360,6 +363,7 @@ impl DeletionList { let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list"); VirtualFile::crashsafe_overwrite(&path, &temp_path, &bytes) .await + .maybe_fatal_err("save deletion list") .map_err(Into::into) } } diff --git a/pageserver/src/deletion_queue/list_writer.rs b/pageserver/src/deletion_queue/list_writer.rs index 6727957b2a..28daae2da5 100644 --- a/pageserver/src/deletion_queue/list_writer.rs +++ b/pageserver/src/deletion_queue/list_writer.rs @@ -34,6 +34,8 @@ use crate::deletion_queue::TEMP_SUFFIX; use crate::metrics; use crate::tenant::remote_timeline_client::remote_layer_path; use crate::tenant::storage_layer::LayerFileName; +use crate::virtual_file::on_fatal_io_error; +use crate::virtual_file::MaybeFatalIo; // The number of keys in a DeletionList before we will proactively persist it // (without reaching a flush deadline). This aims to deliver objects of the order @@ -195,7 +197,7 @@ impl ListWriter { debug!("Deletion header {header_path} not found, first start?"); Ok(None) } else { - Err(anyhow::anyhow!(e)) + on_fatal_io_error(&e, "reading deletion header"); } } } @@ -216,16 +218,9 @@ impl ListWriter { self.pending.sequence = validated_sequence + 1; let deletion_directory = self.conf.deletion_prefix(); - let mut dir = match tokio::fs::read_dir(&deletion_directory).await { - Ok(d) => d, - Err(e) => { - warn!("Failed to open deletion list directory {deletion_directory}: {e:#}"); - - // Give up: if we can't read the deletion list directory, we probably can't - // write lists into it later, so the queue won't work. - return Err(e.into()); - } - }; + let mut dir = tokio::fs::read_dir(&deletion_directory) + .await + .fatal_err("read deletion directory"); let list_name_pattern = Regex::new("(?[a-zA-Z0-9]{16})-(?[a-zA-Z0-9]{2}).list").unwrap(); @@ -233,7 +228,7 @@ impl ListWriter { let temp_extension = format!(".{TEMP_SUFFIX}"); let header_path = self.conf.deletion_header_path(); let mut seqs: Vec = Vec::new(); - while let Some(dentry) = dir.next_entry().await? { + while let Some(dentry) = dir.next_entry().await.fatal_err("read deletion dentry") { let file_name = dentry.file_name(); let dentry_str = file_name.to_string_lossy(); @@ -246,11 +241,9 @@ impl ListWriter { info!("Cleaning up temporary file {dentry_str}"); let absolute_path = deletion_directory.join(dentry.file_name().to_str().expect("non-Unicode path")); - if let Err(e) = tokio::fs::remove_file(&absolute_path).await { - // Non-fatal error: we will just leave the file behind but not - // try and load it. - warn!("Failed to clean up temporary file {absolute_path}: {e:#}"); - } + tokio::fs::remove_file(&absolute_path) + .await + .fatal_err("delete temp file"); continue; } @@ -290,7 +283,9 @@ impl ListWriter { for s in seqs { let list_path = self.conf.deletion_list_path(s); - let list_bytes = tokio::fs::read(&list_path).await?; + let list_bytes = tokio::fs::read(&list_path) + .await + .fatal_err("read deletion list"); let mut deletion_list = match serde_json::from_slice::(&list_bytes) { Ok(l) => l, diff --git a/pageserver/src/deletion_queue/validator.rs b/pageserver/src/deletion_queue/validator.rs index a2cbfb9dc7..72bdbdefd6 100644 --- a/pageserver/src/deletion_queue/validator.rs +++ b/pageserver/src/deletion_queue/validator.rs @@ -28,6 +28,7 @@ use crate::config::PageServerConf; use crate::control_plane_client::ControlPlaneGenerationsApi; use crate::control_plane_client::RetryForeverError; use crate::metrics; +use crate::virtual_file::MaybeFatalIo; use super::deleter::DeleterMessage; use super::DeletionHeader; @@ -287,16 +288,9 @@ where async fn cleanup_lists(&mut self, list_paths: Vec) { for list_path in list_paths { debug!("Removing deletion list {list_path}"); - - if let Err(e) = tokio::fs::remove_file(&list_path).await { - // Unexpected: we should have permissions and nothing else should - // be touching these files. We will leave the file behind. Subsequent - // pageservers will try and load it again: hopefully whatever storage - // issue (probably permissions) has been fixed by then. - tracing::error!("Failed to delete {list_path}: {e:#}"); - metrics::DELETION_QUEUE.unexpected_errors.inc(); - break; - } + tokio::fs::remove_file(&list_path) + .await + .fatal_err("remove deletion list"); } } diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index a2e8f30e15..b58b883ab6 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -19,6 +19,7 @@ use std::io::{Error, ErrorKind, Seek, SeekFrom}; use std::os::unix::fs::FileExt; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{RwLock, RwLockWriteGuard}; +use utils::fs_ext; /// /// A virtual file descriptor. You can use this just like std::fs::File, but internally @@ -173,37 +174,78 @@ impl OpenFiles { } } -#[derive(Debug, thiserror::Error)] -pub enum CrashsafeOverwriteError { - #[error("final path has no parent dir")] - FinalPathHasNoParentDir, - #[error("remove tempfile")] - RemovePreviousTempfile(#[source] std::io::Error), - #[error("create tempfile")] - CreateTempfile(#[source] std::io::Error), - #[error("write tempfile")] - WriteContents(#[source] std::io::Error), - #[error("sync tempfile")] - SyncTempfile(#[source] std::io::Error), - #[error("rename tempfile to final path")] - RenameTempfileToFinalPath(#[source] std::io::Error), - #[error("open final path parent dir")] - OpenFinalPathParentDir(#[source] std::io::Error), - #[error("sync final path parent dir")] - SyncFinalPathParentDir(#[source] std::io::Error), +/// Identify error types that should alwways terminate the process. Other +/// error types may be elegible for retry. +pub(crate) fn is_fatal_io_error(e: &std::io::Error) -> bool { + use nix::errno::Errno::*; + match e.raw_os_error().map(nix::errno::from_i32) { + Some(EIO) => { + // Terminate on EIO because we no longer trust the device to store + // data safely, or to uphold persistence guarantees on fsync. + true + } + Some(EROFS) => { + // Terminate on EROFS because a filesystem is usually remounted + // readonly when it has experienced some critical issue, so the same + // logic as EIO applies. + true + } + Some(EACCES) => { + // Terminate on EACCESS because we should always have permissions + // for our own data dir: if we don't, then we can't do our job and + // need administrative intervention to fix permissions. Terminating + // is the best way to make sure we stop cleanly rather than going + // into infinite retry loops, and will make it clear to the outside + // world that we need help. + true + } + _ => { + // Treat all other local file I/O errors are retryable. This includes: + // - ENOSPC: we stay up and wait for eviction to free some space + // - EINVAL, EBADF, EBADFD: this is a code bug, not a filesystem/hardware issue + // - WriteZero, Interrupted: these are used internally VirtualFile + false + } + } } -impl CrashsafeOverwriteError { - /// Returns true iff the new contents are durably stored. - pub fn are_new_contents_durable(&self) -> bool { + +/// Call this when the local filesystem gives us an error with an external +/// cause: this includes EIO, EROFS, and EACCESS: all these indicate either +/// bad storage or bad configuration, and we can't fix that from inside +/// a running process. +pub(crate) fn on_fatal_io_error(e: &std::io::Error, context: &str) -> ! { + tracing::error!("Fatal I/O error: {e}: {context})"); + std::process::abort(); +} + +pub(crate) trait MaybeFatalIo { + fn maybe_fatal_err(self, context: &str) -> std::io::Result; + fn fatal_err(self, context: &str) -> T; +} + +impl MaybeFatalIo for std::io::Result { + /// Terminate the process if the result is an error of a fatal type, else pass it through + /// + /// This is appropriate for writes, where we typically want to die on EIO/ACCES etc, but + /// not on ENOSPC. + fn maybe_fatal_err(self, context: &str) -> std::io::Result { + if let Err(e) = &self { + if is_fatal_io_error(e) { + on_fatal_io_error(e, context); + } + } + self + } + + /// Terminate the process on any I/O error. + /// + /// This is appropriate for reads on files that we know exist: they should always work. + fn fatal_err(self, context: &str) -> T { match self { - Self::FinalPathHasNoParentDir => false, - Self::RemovePreviousTempfile(_) => false, - Self::CreateTempfile(_) => false, - Self::WriteContents(_) => false, - Self::SyncTempfile(_) => false, - Self::RenameTempfileToFinalPath(_) => false, - Self::OpenFinalPathParentDir(_) => false, - Self::SyncFinalPathParentDir(_) => true, + Ok(v) => v, + Err(e) => { + on_fatal_io_error(&e, context); + } } } } @@ -284,15 +326,13 @@ impl VirtualFile { final_path: &Utf8Path, tmp_path: &Utf8Path, content: &[u8], - ) -> Result<(), CrashsafeOverwriteError> { + ) -> std::io::Result<()> { let Some(final_path_parent) = final_path.parent() else { - return Err(CrashsafeOverwriteError::FinalPathHasNoParentDir); + return Err(std::io::Error::from_raw_os_error( + nix::errno::Errno::EINVAL as i32, + )); }; - match std::fs::remove_file(tmp_path) { - Ok(()) => {} - Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} - Err(e) => return Err(CrashsafeOverwriteError::RemovePreviousTempfile(e)), - } + std::fs::remove_file(tmp_path).or_else(fs_ext::ignore_not_found)?; let mut file = Self::open_with_options( tmp_path, OpenOptions::new() @@ -301,31 +341,20 @@ impl VirtualFile { // we bail out instead of causing damage. .create_new(true), ) - .await - .map_err(CrashsafeOverwriteError::CreateTempfile)?; - file.write_all(content) - .await - .map_err(CrashsafeOverwriteError::WriteContents)?; - file.sync_all() - .await - .map_err(CrashsafeOverwriteError::SyncTempfile)?; + .await?; + file.write_all(content).await?; + file.sync_all().await?; drop(file); // before the rename, that's important! // renames are atomic - std::fs::rename(tmp_path, final_path) - .map_err(CrashsafeOverwriteError::RenameTempfileToFinalPath)?; + std::fs::rename(tmp_path, final_path)?; // Only open final path parent dirfd now, so that this operation only // ever holds one VirtualFile fd at a time. That's important because // the current `find_victim_slot` impl might pick the same slot for both // VirtualFile., and it eventually does a blocking write lock instead of // try_lock. let final_parent_dirfd = - Self::open_with_options(final_path_parent, OpenOptions::new().read(true)) - .await - .map_err(CrashsafeOverwriteError::OpenFinalPathParentDir)?; - final_parent_dirfd - .sync_all() - .await - .map_err(CrashsafeOverwriteError::SyncFinalPathParentDir)?; + Self::open_with_options(final_path_parent, OpenOptions::new().read(true)).await?; + final_parent_dirfd.sync_all().await?; Ok(()) } From 3737fe3a4bf50d8e24aa6eea400e6464aa73bcda Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Thu, 2 Nov 2023 13:03:38 +0200 Subject: [PATCH 5/6] fix(layer): error out early if layer path is non-file (#5756) In an earlier PR https://github.com/neondatabase/neon/pull/5743#discussion_r1378625244 I added a FIXME and there's a simple solution suggested by @jcsp, so implement it. Wondering why I did not implement this originally, there is no concept of a permanent failure, so this failure will happen quite often. I don't think the frequency is a problem however. Sadly for std::fs::FileType there is only decimal and hex formatting, no octal. --- pageserver/src/tenant/storage_layer/layer.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 94edfa6fe0..fc4ba75dfc 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -637,14 +637,16 @@ impl LayerInner { // check if we really need to be downloaded; could have been already downloaded by a // cancelled previous attempt. - // - // FIXME: what if it's a directory? that is currently needs_download == true let needs_download = self .needs_download() .await .map_err(DownloadError::PreStatFailed)?; let permit = if let Some(reason) = needs_download { + if let NeedsDownload::NotFile(ft) = reason { + return Err(DownloadError::NotFile(ft)); + } + // only reset this after we've decided we really need to download. otherwise it'd // be impossible to mark cancelled downloads for eviction, like one could imagine // we would like to do for prefetching which was not needed. @@ -883,7 +885,7 @@ impl LayerInner { fn is_file_present_and_good_size(&self, m: &std::fs::Metadata) -> Result<(), NeedsDownload> { // in future, this should include sha2-256 validation of the file. if !m.is_file() { - Err(NeedsDownload::NotFile) + Err(NeedsDownload::NotFile(m.file_type())) } else if m.len() != self.desc.file_size { Err(NeedsDownload::WrongSize { actual: m.len(), @@ -1082,6 +1084,8 @@ enum DownloadError { ContextAndConfigReallyDeniesDownloads, #[error("downloading is really required but not allowed by this method")] DownloadRequired, + #[error("layer path exists, but it is not a file: {0:?}")] + NotFile(std::fs::FileType), /// Why no error here? Because it will be reported by page_service. We should had also done /// retries already. #[error("downloading evicted layer file failed")] @@ -1097,7 +1101,7 @@ enum DownloadError { #[derive(Debug, PartialEq)] pub(crate) enum NeedsDownload { NotFound, - NotFile, + NotFile(std::fs::FileType), WrongSize { actual: u64, expected: u64 }, } @@ -1105,7 +1109,7 @@ impl std::fmt::Display for NeedsDownload { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { NeedsDownload::NotFound => write!(f, "file was not found"), - NeedsDownload::NotFile => write!(f, "path is not a file"), + NeedsDownload::NotFile(ft) => write!(f, "path is not a file; {ft:?}"), NeedsDownload::WrongSize { actual, expected } => { write!(f, "file size mismatch {actual} vs. {expected}") } From 098d3111a5c4fb057571246d637edfa54d95b334 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Thu, 2 Nov 2023 15:06:14 +0200 Subject: [PATCH 6/6] fix(layer): get_and_upgrade and metrics (#5767) when introducing `get_and_upgrade` I forgot that an `evict_and_wait` would had already incremented the counter for started evictions, but an upgrade would just "silently" cancel the eviction as no drop would ever run. these metrics are likely sources for alerts with the next release, so it's important to keep them correct. --- pageserver/src/tenant/storage_layer/layer.rs | 27 ++++++++++++-------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index fc4ba75dfc..b320c02f9b 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -337,18 +337,16 @@ enum ResidentOrWantedEvicted { } impl ResidentOrWantedEvicted { - /// If `Some` is returned, the ResidentOrWantedEvicted has been upgraded back from - /// `ResidentOrWantedEvicted::WantedEvicted` to `ResidentOrWantedEvicted::Resident`. - fn get_and_upgrade(&mut self) -> Option> { + fn get_and_upgrade(&mut self) -> Option<(Arc, bool)> { match self { - ResidentOrWantedEvicted::Resident(strong) => Some(strong.clone()), + ResidentOrWantedEvicted::Resident(strong) => Some((strong.clone(), false)), ResidentOrWantedEvicted::WantedEvicted(weak, _) => match weak.upgrade() { Some(strong) => { LAYER_IMPL_METRICS.inc_raced_wanted_evicted_accesses(); *self = ResidentOrWantedEvicted::Resident(strong.clone()); - Some(strong) + Some((strong, true)) } None => None, }, @@ -696,7 +694,7 @@ impl LayerInner { // below paths anymore essentially limiting the max loop iterations to 2. let (value, init_permit) = download(init_permit).await?; let mut guard = self.inner.set(value, init_permit); - let strong = guard + let (strong, _upgraded) = guard .get_and_upgrade() .expect("init creates strong reference, we held the init permit"); return Ok(strong); @@ -705,11 +703,17 @@ impl LayerInner { let (weak, permit) = { let mut locked = self.inner.get_or_init(download).await?; - if let Some(strong) = locked.get_and_upgrade() { - self.wanted_evicted.store(false, Ordering::Relaxed); + if let Some((strong, upgraded)) = locked.get_and_upgrade() { + if upgraded { + // when upgraded back, the Arc is still available, but + // previously a `evict_and_wait` was received. + self.wanted_evicted.store(false, Ordering::Relaxed); - // error out any `evict_and_wait` - drop(self.status.send(Status::Downloaded)); + // error out any `evict_and_wait` + drop(self.status.send(Status::Downloaded)); + LAYER_IMPL_METRICS + .inc_eviction_cancelled(EvictionCancelled::UpgradedBackOnAccess); + } return Ok(strong); } else { @@ -1505,6 +1509,8 @@ enum EvictionCancelled { AlreadyReinitialized, /// Not evicted because of a pending reinitialization LostToDownload, + /// After eviction, there was a new layer access which cancelled the eviction. + UpgradedBackOnAccess, } impl EvictionCancelled { @@ -1517,6 +1523,7 @@ impl EvictionCancelled { EvictionCancelled::RemoveFailed => "remove_failed", EvictionCancelled::AlreadyReinitialized => "already_reinitialized", EvictionCancelled::LostToDownload => "lost_to_download", + EvictionCancelled::UpgradedBackOnAccess => "upgraded_back_on_access", } } }