From ee93700a0fe5548c391ba8da5f10d5841c8911db Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Thu, 29 Feb 2024 22:54:16 +0200 Subject: [PATCH] dube: timeout individual layer evictions, log progress and record metrics (#6131) Because of bugs evictions could hang and pause disk usage eviction task. One such bug is known and fixed #6928. Guard each layer eviction with a modest timeout deeming timeouted evictions as failures, to be conservative. In addition, add logging and metrics recording on each eviction iteration: - log collection completed with duration and amount of layers - per tenant collection time is observed in a new histogram - per tenant layer count is observed in a new histogram - record metric for collected, selected and evicted layer counts - log if eviction takes more than 10s - log eviction completion with eviction duration Additionally remove dead code for which no dead code warnings appeared in earlier PR. Follow-up to: #6060. --- pageserver/src/disk_usage_eviction_task.rs | 145 ++++++++--- pageserver/src/metrics.rs | 59 +++++ pageserver/src/tenant/secondary.rs | 89 ++++--- pageserver/src/tenant/storage_layer.rs | 2 +- pageserver/src/tenant/storage_layer/layer.rs | 35 ++- .../src/tenant/storage_layer/layer/tests.rs | 232 +++++++++++++++++- pageserver/src/tenant/timeline.rs | 21 +- .../src/tenant/timeline/eviction_task.rs | 13 +- 8 files changed, 492 insertions(+), 104 deletions(-) diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index b1c6f35704..92c1475aef 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -58,6 +58,7 @@ use utils::{completion, id::TimelineId}; use crate::{ config::PageServerConf, + metrics::disk_usage_based_eviction::METRICS, task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}, tenant::{ self, @@ -65,7 +66,6 @@ use crate::{ remote_timeline_client::LayerFileMetadata, secondary::SecondaryTenant, storage_layer::{AsLayerDesc, EvictionError, Layer, LayerFileName}, - Timeline, }, }; @@ -409,13 +409,23 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl( "running disk usage based eviction due to pressure" ); - let candidates = + let (candidates, collection_time) = { + let started_at = std::time::Instant::now(); match collect_eviction_candidates(tenant_manager, eviction_order, cancel).await? { EvictionCandidates::Cancelled => { return Ok(IterationOutcome::Cancelled); } - EvictionCandidates::Finished(partitioned) => partitioned, - }; + EvictionCandidates::Finished(partitioned) => (partitioned, started_at.elapsed()), + } + }; + + METRICS.layers_collected.inc_by(candidates.len() as u64); + + tracing::info!( + elapsed_ms = collection_time.as_millis(), + total_layers = candidates.len(), + "collection completed" + ); // Debug-log the list of candidates let now = SystemTime::now(); @@ -446,9 +456,10 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl( // the tenant's min-resident-size threshold, print a warning, and memorize the disk // usage at that point, in 'usage_planned_min_resident_size_respecting'. - let selection = select_victims(&candidates, usage_pre); + let (evicted_amount, usage_planned) = + select_victims(&candidates, usage_pre).into_amount_and_planned(); - let (evicted_amount, usage_planned) = selection.into_amount_and_planned(); + METRICS.layers_selected.inc_by(evicted_amount as u64); // phase2: evict layers @@ -477,9 +488,15 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl( if let Some(next) = next { match next { Ok(Ok(file_size)) => { + METRICS.layers_evicted.inc(); usage_assumed.add_available_bytes(file_size); } - Ok(Err((file_size, EvictionError::NotFound | EvictionError::Downloaded))) => { + Ok(Err(( + file_size, + EvictionError::NotFound + | EvictionError::Downloaded + | EvictionError::Timeout, + ))) => { evictions_failed.file_sizes += file_size; evictions_failed.count += 1; } @@ -495,7 +512,10 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl( // calling again when consumed_all is fine as evicted is fused. let Some((_partition, candidate)) = evicted.next() else { - consumed_all = true; + if !consumed_all { + tracing::info!("all evictions started, waiting"); + consumed_all = true; + } continue; }; @@ -503,11 +523,15 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl( EvictionLayer::Attached(layer) => { let file_size = layer.layer_desc().file_size; js.spawn(async move { - layer - .evict_and_wait() - .await - .map(|()| file_size) - .map_err(|e| (file_size, e)) + // have a low eviction waiting timeout because our LRU calculations go stale fast; + // also individual layer evictions could hang because of bugs and we do not want to + // pause disk_usage_based_eviction for such. + let timeout = std::time::Duration::from_secs(5); + + match layer.evict_and_wait(timeout).await { + Ok(()) => Ok(file_size), + Err(e) => Err((file_size, e)), + } }); } EvictionLayer::Secondary(layer) => { @@ -529,6 +553,30 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl( (usage_assumed, evictions_failed) }; + let started_at = std::time::Instant::now(); + + let evict_layers = async move { + let mut evict_layers = std::pin::pin!(evict_layers); + + let maximum_expected = std::time::Duration::from_secs(10); + + let res = tokio::time::timeout(maximum_expected, &mut evict_layers).await; + let tuple = if let Ok(tuple) = res { + tuple + } else { + let elapsed = started_at.elapsed(); + tracing::info!(elapsed_ms = elapsed.as_millis(), "still ongoing"); + evict_layers.await + }; + + let elapsed = started_at.elapsed(); + tracing::info!(elapsed_ms = elapsed.as_millis(), "completed"); + tuple + }; + + let evict_layers = + evict_layers.instrument(tracing::info_span!("evict_layers", layers=%evicted_amount)); + let (usage_assumed, evictions_failed) = tokio::select! { tuple = evict_layers => { tuple }, _ = cancel.cancelled() => { @@ -763,6 +811,8 @@ async fn collect_eviction_candidates( eviction_order: EvictionOrder, cancel: &CancellationToken, ) -> anyhow::Result { + const LOG_DURATION_THRESHOLD: std::time::Duration = std::time::Duration::from_secs(10); + // get a snapshot of the list of tenants let tenants = tenant::mgr::list_tenants() .await @@ -791,6 +841,8 @@ async fn collect_eviction_candidates( continue; } + let started_at = std::time::Instant::now(); + // collect layers from all timelines in this tenant // // If one of the timelines becomes `!is_active()` during the iteration, @@ -805,6 +857,7 @@ async fn collect_eviction_candidates( } let info = tl.get_local_layers_for_disk_usage_eviction().await; debug!(tenant_id=%tl.tenant_shard_id.tenant_id, shard_id=%tl.tenant_shard_id.shard_slug(), timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len()); + tenant_candidates.extend(info.resident_layers.into_iter()); max_layer_size = max_layer_size.max(info.max_layer_size.unwrap_or(0)); @@ -870,7 +923,25 @@ async fn collect_eviction_candidates( (partition, candidate) }); + METRICS + .tenant_layer_count + .observe(tenant_candidates.len() as f64); + candidates.extend(tenant_candidates); + + let elapsed = started_at.elapsed(); + METRICS + .tenant_collection_time + .observe(elapsed.as_secs_f64()); + + if elapsed > LOG_DURATION_THRESHOLD { + tracing::info!( + tenant_id=%tenant.tenant_shard_id().tenant_id, + shard_id=%tenant.tenant_shard_id().shard_slug(), + elapsed_ms = elapsed.as_millis(), + "collection took longer than threshold" + ); + } } // Note: the same tenant ID might be hit twice, if it transitions from attached to @@ -885,11 +956,11 @@ async fn collect_eviction_candidates( }, ); - for secondary_tenant in secondary_tenants { + for tenant in secondary_tenants { // for secondary tenants we use a sum of on_disk layers and already evicted layers. this is // to prevent repeated disk usage based evictions from completely draining less often // updating secondaries. - let (mut layer_info, total_layers) = secondary_tenant.get_layers_for_eviction(); + let (mut layer_info, total_layers) = tenant.get_layers_for_eviction(); debug_assert!( total_layers >= layer_info.resident_layers.len(), @@ -897,6 +968,8 @@ async fn collect_eviction_candidates( layer_info.resident_layers.len() ); + let started_at = std::time::Instant::now(); + layer_info .resident_layers .sort_unstable_by_key(|layer_info| std::cmp::Reverse(layer_info.last_activity_ts)); @@ -918,9 +991,27 @@ async fn collect_eviction_candidates( ) }); + METRICS + .tenant_layer_count + .observe(tenant_candidates.len() as f64); candidates.extend(tenant_candidates); tokio::task::yield_now().await; + + let elapsed = started_at.elapsed(); + + METRICS + .tenant_collection_time + .observe(elapsed.as_secs_f64()); + + if elapsed > LOG_DURATION_THRESHOLD { + tracing::info!( + tenant_id=%tenant.tenant_shard_id().tenant_id, + shard_id=%tenant.tenant_shard_id().shard_slug(), + elapsed_ms = elapsed.as_millis(), + "collection took longer than threshold" + ); + } } debug_assert!(MinResidentSizePartition::Above < MinResidentSizePartition::Below, @@ -997,30 +1088,6 @@ impl VictimSelection { } } -struct TimelineKey(Arc); - -impl PartialEq for TimelineKey { - fn eq(&self, other: &Self) -> bool { - Arc::ptr_eq(&self.0, &other.0) - } -} - -impl Eq for TimelineKey {} - -impl std::hash::Hash for TimelineKey { - fn hash(&self, state: &mut H) { - Arc::as_ptr(&self.0).hash(state); - } -} - -impl std::ops::Deref for TimelineKey { - type Target = Timeline; - - fn deref(&self) -> &Self::Target { - self.0.as_ref() - } -} - /// A totally ordered f32 subset we can use with sorting functions. pub(crate) mod finite_f32 { diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 1749e02c7f..1d894ed8a5 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -2474,6 +2474,64 @@ pub(crate) mod tenant_throttling { } } +pub(crate) mod disk_usage_based_eviction { + use super::*; + + pub(crate) struct Metrics { + pub(crate) tenant_collection_time: Histogram, + pub(crate) tenant_layer_count: Histogram, + pub(crate) layers_collected: IntCounter, + pub(crate) layers_selected: IntCounter, + pub(crate) layers_evicted: IntCounter, + } + + impl Default for Metrics { + fn default() -> Self { + let tenant_collection_time = register_histogram!( + "pageserver_disk_usage_based_eviction_tenant_collection_seconds", + "Time spent collecting layers from a tenant -- not normalized by collected layer amount", + vec![0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0] + ) + .unwrap(); + + let tenant_layer_count = register_histogram!( + "pageserver_disk_usage_based_eviction_tenant_collected_layers", + "Amount of layers gathered from a tenant", + vec![5.0, 50.0, 500.0, 5000.0, 50000.0] + ) + .unwrap(); + + let layers_collected = register_int_counter!( + "pageserver_disk_usage_based_eviction_collected_layers_total", + "Amount of layers collected" + ) + .unwrap(); + + let layers_selected = register_int_counter!( + "pageserver_disk_usage_based_eviction_select_layers_total", + "Amount of layers selected" + ) + .unwrap(); + + let layers_evicted = register_int_counter!( + "pageserver_disk_usage_based_eviction_evicted_layers_total", + "Amount of layers successfully evicted" + ) + .unwrap(); + + Self { + tenant_collection_time, + tenant_layer_count, + layers_collected, + layers_selected, + layers_evicted, + } + } + } + + pub(crate) static METRICS: Lazy = Lazy::new(Metrics::default); +} + pub fn preinitialize_metrics() { // Python tests need these and on some we do alerting. // @@ -2508,6 +2566,7 @@ pub fn preinitialize_metrics() { Lazy::force(&TENANT_MANAGER); Lazy::force(&crate::tenant::storage_layer::layer::LAYER_IMPL_METRICS); + Lazy::force(&disk_usage_based_eviction::METRICS); // countervecs [&BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT] diff --git a/pageserver/src/tenant/secondary.rs b/pageserver/src/tenant/secondary.rs index c466ac0c24..14e88b836e 100644 --- a/pageserver/src/tenant/secondary.rs +++ b/pageserver/src/tenant/secondary.rs @@ -32,7 +32,7 @@ use remote_storage::GenericRemoteStorage; use tokio_util::sync::CancellationToken; use tracing::instrument; -use utils::{completion::Barrier, fs_ext, id::TimelineId, sync::gate::Gate}; +use utils::{completion::Barrier, id::TimelineId, sync::gate::Gate}; enum DownloadCommand { Download(TenantShardId), @@ -121,6 +121,10 @@ impl SecondaryTenant { }) } + pub(crate) fn tenant_shard_id(&self) -> TenantShardId { + self.tenant_shard_id + } + pub(crate) async fn shutdown(&self) { self.cancel.cancel(); @@ -164,16 +168,17 @@ impl SecondaryTenant { self.detail.lock().unwrap().get_layers_for_eviction(self) } + /// Cancellation safe, but on cancellation the eviction will go through #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline_id, name=%name))] pub(crate) async fn evict_layer( - &self, + self: &Arc, conf: &PageServerConf, timeline_id: TimelineId, name: LayerFileName, ) { debug_assert_current_span_has_tenant_id(); - let _guard = match self.gate.enter() { + let guard = match self.gate.enter() { Ok(g) => g, Err(_) => { tracing::debug!("Dropping layer evictions, secondary tenant shutting down",); @@ -187,35 +192,57 @@ impl SecondaryTenant { .timeline_path(&self.tenant_shard_id, &timeline_id) .join(name.file_name()); - // We tolerate ENOENT, because between planning eviction and executing - // it, the secondary downloader could have seen an updated heatmap that - // resulted in a layer being deleted. - // Other local I/O errors are process-fatal: these should never happen. - tokio::fs::remove_file(path) - .await - .or_else(fs_ext::ignore_not_found) - .fatal_err("Deleting layer during eviction"); + let this = self.clone(); - // Update the timeline's state. This does not have to be synchronized with - // the download process, because: - // - If downloader is racing with us to remove a file (e.g. because it is - // removed from heatmap), then our mutual .remove() operations will both - // succeed. - // - If downloader is racing with us to download the object (this would require - // multiple eviction iterations to race with multiple download iterations), then - // if we remove it from the state, the worst that happens is the downloader - // downloads it again before re-inserting, or we delete the file but it remains - // in the state map (in which case it will be downloaded if this secondary - // tenant transitions to attached and tries to access it) - // - // The important assumption here is that the secondary timeline state does not - // have to 100% match what is on disk, because it's a best-effort warming - // of the cache. - let mut detail = self.detail.lock().unwrap(); - if let Some(timeline_detail) = detail.timelines.get_mut(&timeline_id) { - timeline_detail.on_disk_layers.remove(&name); - timeline_detail.evicted_at.insert(name, now); - } + // spawn it to be cancellation safe + tokio::task::spawn_blocking(move || { + let _guard = guard; + // We tolerate ENOENT, because between planning eviction and executing + // it, the secondary downloader could have seen an updated heatmap that + // resulted in a layer being deleted. + // Other local I/O errors are process-fatal: these should never happen. + let deleted = std::fs::remove_file(path); + + let not_found = deleted + .as_ref() + .is_err_and(|x| x.kind() == std::io::ErrorKind::NotFound); + + let deleted = if not_found { + false + } else { + deleted + .map(|()| true) + .fatal_err("Deleting layer during eviction") + }; + + if !deleted { + // skip updating accounting and putting perhaps later timestamp + return; + } + + // Update the timeline's state. This does not have to be synchronized with + // the download process, because: + // - If downloader is racing with us to remove a file (e.g. because it is + // removed from heatmap), then our mutual .remove() operations will both + // succeed. + // - If downloader is racing with us to download the object (this would require + // multiple eviction iterations to race with multiple download iterations), then + // if we remove it from the state, the worst that happens is the downloader + // downloads it again before re-inserting, or we delete the file but it remains + // in the state map (in which case it will be downloaded if this secondary + // tenant transitions to attached and tries to access it) + // + // The important assumption here is that the secondary timeline state does not + // have to 100% match what is on disk, because it's a best-effort warming + // of the cache. + let mut detail = this.detail.lock().unwrap(); + if let Some(timeline_detail) = detail.timelines.get_mut(&timeline_id) { + timeline_detail.on_disk_layers.remove(&name); + timeline_detail.evicted_at.insert(name, now); + } + }) + .await + .expect("secondary eviction should not have panicked"); } } diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 9de820912e..299950cc21 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -72,7 +72,7 @@ where /// the same ValueReconstructState struct in the next 'get_value_reconstruct_data' /// call, to collect more records. /// -#[derive(Debug)] +#[derive(Debug, Default)] pub struct ValueReconstructState { pub records: Vec<(Lsn, NeonWalRecord)>, pub img: Option<(Lsn, Bytes)>, diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 13c9e5c989..247dd1a8e4 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -8,7 +8,7 @@ use pageserver_api::shard::ShardIndex; use std::ops::Range; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Weak}; -use std::time::SystemTime; +use std::time::{Duration, SystemTime}; use tracing::Instrument; use utils::lsn::Lsn; use utils::sync::heavier_once_cell; @@ -208,10 +208,15 @@ impl Layer { /// If for a bad luck or blocking of the executor, we miss the actual eviction and the layer is /// re-downloaded, [`EvictionError::Downloaded`] is returned. /// + /// Timeout is mandatory, because waiting for eviction is only needed for our tests; eviction + /// will happen regardless the future returned by this method completing unless there is a + /// read access (currently including [`Layer::keep_resident`]) before eviction gets to + /// complete. + /// /// Technically cancellation safe, but cancelling might shift the viewpoint of what generation /// of download-evict cycle on retry. - pub(crate) async fn evict_and_wait(&self) -> Result<(), EvictionError> { - self.0.evict_and_wait().await + pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> { + self.0.evict_and_wait(timeout).await } /// Delete the layer file when the `self` gets dropped, also try to schedule a remote index upload @@ -363,7 +368,7 @@ impl Layer { /// /// Does not start local deletion, use [`Self::delete_on_drop`] for that /// separatedly. - #[cfg(feature = "testing")] + #[cfg(any(feature = "testing", test))] pub(crate) fn wait_drop(&self) -> impl std::future::Future + 'static { let mut rx = self.0.status.subscribe(); @@ -632,7 +637,7 @@ impl LayerInner { /// Cancellation safe, however dropping the future and calling this method again might result /// in a new attempt to evict OR join the previously started attempt. - pub(crate) async fn evict_and_wait(&self) -> Result<(), EvictionError> { + pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> { use tokio::sync::broadcast::error::RecvError; assert!(self.have_remote_client); @@ -652,16 +657,22 @@ impl LayerInner { if strong.is_some() { // drop the DownloadedLayer outside of the holding the guard drop(strong); + + // idea here is that only one evicter should ever get to witness a strong reference, + // which means whenever get_or_maybe_download upgrades a weak, it must mark up a + // cancelled eviction and signal us, like it currently does. + // + // a second concurrent evict_and_wait will not see a strong reference. LAYER_IMPL_METRICS.inc_started_evictions(); } - match rx.recv().await { - Ok(Status::Evicted) => Ok(()), - Ok(Status::Downloaded) => Err(EvictionError::Downloaded), - Err(RecvError::Closed) => { + match tokio::time::timeout(timeout, rx.recv()).await { + Ok(Ok(Status::Evicted)) => Ok(()), + Ok(Ok(Status::Downloaded)) => Err(EvictionError::Downloaded), + Ok(Err(RecvError::Closed)) => { unreachable!("sender cannot be dropped while we are in &self method") } - Err(RecvError::Lagged(_)) => { + Ok(Err(RecvError::Lagged(_))) => { // this is quite unlikely, but we are blocking a lot in the async context, so // we might be missing this because we are stuck on a LIFO slot on a thread // which is busy blocking for a 1TB database create_image_layers. @@ -674,6 +685,7 @@ impl LayerInner { None => Ok(()), } } + Err(_timeout) => Err(EvictionError::Timeout), } } @@ -1195,6 +1207,9 @@ pub(crate) enum EvictionError { /// Evictions must always lose to downloads in races, and this time it happened. #[error("layer was downloaded instead")] Downloaded, + + #[error("eviction did not happen within timeout")] + Timeout, } /// Error internal to the [`LayerInner::get_or_maybe_download`] diff --git a/pageserver/src/tenant/storage_layer/layer/tests.rs b/pageserver/src/tenant/storage_layer/layer/tests.rs index 01c62b6f83..b43534efd4 100644 --- a/pageserver/src/tenant/storage_layer/layer/tests.rs +++ b/pageserver/src/tenant/storage_layer/layer/tests.rs @@ -1,13 +1,173 @@ use futures::StreamExt; +use pageserver_api::key::CONTROLFILE_KEY; use tokio::task::JoinSet; +use tracing::Instrument; use utils::{ completion::{self, Completion}, id::TimelineId, }; use super::*; -use crate::task_mgr::BACKGROUND_RUNTIME; -use crate::tenant::harness::TenantHarness; +use crate::{context::DownloadBehavior, task_mgr::BACKGROUND_RUNTIME}; +use crate::{task_mgr::TaskKind, tenant::harness::TenantHarness}; + +/// Used in tests to advance a future to wanted await point, and not futher. +const ADVANCE: std::time::Duration = std::time::Duration::from_secs(3600); + +/// Used in tests to indicate forever long timeout; has to be longer than the amount of ADVANCE +/// timeout uses to advance futures. +const FOREVER: std::time::Duration = std::time::Duration::from_secs(ADVANCE.as_secs() * 24 * 7); + +/// Demonstrate the API and resident -> evicted -> resident -> deleted transitions. +#[tokio::test] +async fn smoke_test() { + let handle = BACKGROUND_RUNTIME.handle(); + + let h = TenantHarness::create("smoke_test").unwrap(); + let span = h.span(); + let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1)); + let (tenant, _) = h.load().await; + + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Download); + + let timeline = tenant + .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx) + .await + .unwrap(); + + let layer = { + let mut layers = { + let layers = timeline.layers.read().await; + layers.resident_layers().collect::>().await + }; + + assert_eq!(layers.len(), 1); + + layers.swap_remove(0) + }; + + // all layers created at pageserver are like `layer`, initialized with strong + // Arc. + + let img_before = { + let mut data = ValueReconstructState::default(); + layer + .get_value_reconstruct_data(CONTROLFILE_KEY, Lsn(0x10)..Lsn(0x11), &mut data, &ctx) + .await + .unwrap(); + data.img + .take() + .expect("tenant harness writes the control file") + }; + + // important part is evicting the layer, which can be done when there are no more ResidentLayer + // instances -- there currently are none, only two `Layer` values, one in the layermap and on + // in scope. + layer.evict_and_wait(FOREVER).await.unwrap(); + + // double-evict returns an error, which is valid if both eviction_task and disk usage based + // eviction would both evict the same layer at the same time. + + let e = layer.evict_and_wait(FOREVER).await.unwrap_err(); + assert!(matches!(e, EvictionError::NotFound)); + + // on accesses when the layer is evicted, it will automatically be downloaded. + let img_after = { + let mut data = ValueReconstructState::default(); + layer + .get_value_reconstruct_data(CONTROLFILE_KEY, Lsn(0x10)..Lsn(0x11), &mut data, &ctx) + .instrument(download_span.clone()) + .await + .unwrap(); + data.img.take().unwrap() + }; + + assert_eq!(img_before, img_after); + + // evict_and_wait can timeout, but it doesn't cancel the evicting itself + // + // ZERO for timeout does not work reliably, so first take up all spawn_blocking slots to + // artificially slow it down. + let helper = SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads(handle).await; + + match layer + .evict_and_wait(std::time::Duration::ZERO) + .await + .unwrap_err() + { + EvictionError::Timeout => { + // expected, but note that the eviction is "still ongoing" + helper.release().await; + // exhaust spawn_blocking pool to ensure it is now complete + SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(handle) + .await; + } + other => unreachable!("{other:?}"), + } + + // only way to query if a layer is resident is to acquire a ResidentLayer instance. + // Layer::keep_resident never downloads, but it might initialize if the layer file is found + // downloaded locally. + let none = layer.keep_resident().await.unwrap(); + assert!( + none.is_none(), + "Expected none, because eviction removed the local file, found: {none:?}" + ); + + // plain downloading is rarely needed + layer + .download_and_keep_resident() + .instrument(download_span) + .await + .unwrap(); + + // last important part is deletion on drop: gc and compaction use it for compacted L0 layers + // or fully garbage collected layers. deletion means deleting the local file, and scheduling a + // deletion of the already unlinked from index_part.json remote file. + // + // marking a layer to be deleted on drop is irreversible; there is no technical reason against + // reversiblity, but currently it is not needed so it is not provided. + layer.delete_on_drop(); + + let path = layer.local_path().to_owned(); + + // wait_drop produces an unconnected to Layer future which will resolve when the + // LayerInner::drop has completed. + let mut wait_drop = std::pin::pin!(layer.wait_drop()); + + // paused time doesn't really work well with timeouts and evict_and_wait, so delay pausing + // until here + tokio::time::pause(); + tokio::time::timeout(ADVANCE, &mut wait_drop) + .await + .expect_err("should had timed out because two strong references exist"); + + tokio::fs::metadata(&path) + .await + .expect("the local layer file still exists"); + + let rtc = timeline.remote_client.as_ref().unwrap(); + + { + let layers = &[layer]; + let mut g = timeline.layers.write().await; + g.finish_gc_timeline(layers); + // this just updates the remote_physical_size for demonstration purposes + rtc.schedule_gc_update(layers).unwrap(); + } + + // when strong references are dropped, the file is deleted and remote deletion is scheduled + wait_drop.await; + + let e = tokio::fs::metadata(&path) + .await + .expect_err("the local file is deleted"); + assert_eq!(e.kind(), std::io::ErrorKind::NotFound); + + rtc.wait_completion().await.unwrap(); + + assert_eq!(rtc.get_remote_physical_size(), 0); +} /// This test demonstrates a previous hang when a eviction and deletion were requested at the same /// time. Now both of them complete per Arc drop semantics. @@ -41,10 +201,10 @@ async fn evict_and_wait_on_wanted_deleted() { let resident = layer.keep_resident().await.unwrap(); { - let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait()); + let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER)); // drive the future to await on the status channel - tokio::time::timeout(std::time::Duration::from_secs(3600), &mut evict_and_wait) + tokio::time::timeout(ADVANCE, &mut evict_and_wait) .await .expect_err("should had been a timeout since we are holding the layer resident"); @@ -115,10 +275,10 @@ async fn residency_check_while_evict_and_wait_on_clogged_spawn_blocking() { let resident = layer.keep_resident().await.unwrap(); - let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait()); + let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER)); // drive the future to await on the status channel - tokio::time::timeout(std::time::Duration::from_secs(3600), &mut evict_and_wait) + tokio::time::timeout(ADVANCE, &mut evict_and_wait) .await .expect_err("should had been a timeout since we are holding the layer resident"); assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get()); @@ -138,7 +298,7 @@ async fn residency_check_while_evict_and_wait_on_clogged_spawn_blocking() { // because the keep_resident check alters wanted evicted without sending a message, we will // never get completed - let e = tokio::time::timeout(std::time::Duration::from_secs(3600), &mut evict_and_wait) + let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait) .await .expect("no timeout, because keep_resident re-initialized") .expect_err("eviction should not have succeeded because re-initialized"); @@ -158,9 +318,10 @@ async fn residency_check_while_evict_and_wait_on_clogged_spawn_blocking() { .sum::() ); - let mut second_eviction = std::pin::pin!(layer.evict_and_wait()); + let mut second_eviction = std::pin::pin!(layer.evict_and_wait(FOREVER)); - tokio::time::timeout(std::time::Duration::from_secs(3600), &mut second_eviction) + // advance to the wait on the queue + tokio::time::timeout(ADVANCE, &mut second_eviction) .await .expect_err("timeout because spawn_blocking is clogged"); @@ -171,7 +332,12 @@ async fn residency_check_while_evict_and_wait_on_clogged_spawn_blocking() { helper.release().await; - tokio::time::timeout(std::time::Duration::from_secs(3600), &mut second_eviction) + // the second_eviction gets to run here + // + // synchronize to be *strictly* after the second_eviction spawn_blocking run + SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(handle).await; + + tokio::time::timeout(ADVANCE, &mut second_eviction) .await .expect("eviction goes through now that spawn_blocking is unclogged") .expect("eviction should succeed, because version matches"); @@ -261,3 +427,49 @@ impl SpawnBlockingPoolHelper { .await } } + +#[test] +fn spawn_blocking_pool_helper_actually_works() { + // create a custom runtime for which we know and control how many blocking threads it has + // + // because the amount is not configurable for our helper, expect the same amount as + // BACKGROUND_RUNTIME using the tokio defaults would have. + let rt = tokio::runtime::Builder::new_current_thread() + .max_blocking_threads(512) + .enable_all() + .build() + .unwrap(); + + let handle = rt.handle(); + + rt.block_on(async move { + // this will not return until all threads are spun up and actually executing the code + // waiting on `consumed` to be `SpawnBlockingPoolHelper::release`'d. + let consumed = SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads(handle).await; + + println!("consumed"); + + let mut jh = std::pin::pin!(tokio::task::spawn_blocking(move || { + // this will not get to run before we release + })); + + println!("spawned"); + + tokio::time::timeout(std::time::Duration::from_secs(1), &mut jh) + .await + .expect_err("the task should not have gotten to run yet"); + + println!("tried to join"); + + consumed.release().await; + + println!("released"); + + tokio::time::timeout(std::time::Duration::from_secs(1), jh) + .await + .expect("no timeout") + .expect("no join error"); + + println!("joined"); + }); +} diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index fa5e7b3685..206f20306e 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1512,10 +1512,14 @@ impl Timeline { return Ok(None); }; - match local_layer.evict_and_wait().await { + // curl has this by default + let timeout = std::time::Duration::from_secs(120); + + match local_layer.evict_and_wait(timeout).await { Ok(()) => Ok(Some(true)), Err(EvictionError::NotFound) => Ok(Some(false)), Err(EvictionError::Downloaded) => Ok(Some(false)), + Err(EvictionError::Timeout) => Ok(Some(false)), } } } @@ -5157,8 +5161,7 @@ mod tests { let harness = TenantHarness::create("two_layer_eviction_attempts_at_the_same_time").unwrap(); - let ctx = any_context(); - let tenant = harness.do_try_load(&ctx).await.unwrap(); + let (tenant, ctx) = harness.load().await; let timeline = tenant .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx) .await @@ -5172,8 +5175,10 @@ mod tests { .expect("should had been resident") .drop_eviction_guard(); - let first = async { layer.evict_and_wait().await }; - let second = async { layer.evict_and_wait().await }; + let forever = std::time::Duration::from_secs(120); + + let first = layer.evict_and_wait(forever); + let second = layer.evict_and_wait(forever); let (first, second) = tokio::join!(first, second); @@ -5192,12 +5197,6 @@ mod tests { } } - fn any_context() -> crate::context::RequestContext { - use crate::context::*; - use crate::task_mgr::*; - RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error) - } - async fn find_some_layer(timeline: &Timeline) -> Layer { let layers = timeline.layers.read().await; let desc = layers diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 008f9482c4..dd603135d2 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -204,6 +204,7 @@ impl Timeline { evicted: usize, errors: usize, not_evictable: usize, + timeouts: usize, #[allow(dead_code)] skipped_for_shutdown: usize, } @@ -267,7 +268,11 @@ impl Timeline { let layer = guard.drop_eviction_guard(); if no_activity_for > p.threshold { // this could cause a lot of allocations in some cases - js.spawn(async move { layer.evict_and_wait().await }); + js.spawn(async move { + layer + .evict_and_wait(std::time::Duration::from_secs(5)) + .await + }); stats.candidates += 1; } } @@ -280,6 +285,9 @@ impl Timeline { Ok(Err(EvictionError::NotFound | EvictionError::Downloaded)) => { stats.not_evictable += 1; } + Ok(Err(EvictionError::Timeout)) => { + stats.timeouts += 1; + } Err(je) if je.is_cancelled() => unreachable!("not used"), Err(je) if je.is_panic() => { /* already logged */ @@ -295,7 +303,8 @@ impl Timeline { stats = join_all => { if stats.candidates == stats.not_evictable { debug!(stats=?stats, "eviction iteration complete"); - } else if stats.errors > 0 || stats.not_evictable > 0 { + } else if stats.errors > 0 || stats.not_evictable > 0 || stats.timeouts > 0 { + // reminder: timeouts are not eviction cancellations warn!(stats=?stats, "eviction iteration complete"); } else { info!(stats=?stats, "eviction iteration complete");