diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index d5bdfc84b9..c3538266d8 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -60,7 +60,12 @@ use utils::serde_percent::Percent; use crate::{ config::PageServerConf, task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}, - tenant::{self, storage_layer::PersistentLayer, timeline::EvictionError, Timeline}, + tenant::{ + self, + storage_layer::{AsLayerDesc, LayerE}, + timeline::EvictionError, + Timeline, + }, }; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -330,9 +335,10 @@ pub async fn disk_usage_eviction_task_iteration_impl( // If we get far enough in the list that we start to evict layers that are below // the tenant's min-resident-size threshold, print a warning, and memorize the disk // usage at that point, in 'usage_planned_min_resident_size_respecting'. - let mut batched: HashMap<_, Vec>> = HashMap::new(); + let mut batched: HashMap<_, Vec<_>> = HashMap::new(); let mut warned = None; let mut usage_planned = usage_pre; + let mut max_batch_size = 0; for (i, (partition, candidate)) in candidates.into_iter().enumerate() { if !usage_planned.has_pressure() { debug!( @@ -349,10 +355,15 @@ pub async fn disk_usage_eviction_task_iteration_impl( usage_planned.add_available_bytes(candidate.layer.layer_desc().file_size); - batched - .entry(TimelineKey(candidate.timeline)) - .or_default() - .push(candidate.layer); + let batch = batched.entry(TimelineKey(candidate.timeline)).or_default(); + + if batch.len() < u32::MAX as usize { + // semaphore will later be used to limit eviction concurrency, and we can express at + // most u32 number of permits. unlikely we would have u32::MAX layers to be evicted, + // but fail gracefully. + batch.push(candidate.layer); + max_batch_size = max_batch_size.max(batch.len()); + } } let usage_planned = match warned { @@ -369,64 +380,101 @@ pub async fn disk_usage_eviction_task_iteration_impl( // phase2: evict victims batched by timeline - // After the loop, `usage_assumed` is the post-eviction usage, - // according to internal accounting. - let mut usage_assumed = usage_pre; - let mut evictions_failed = LayerCount::default(); + let mut js = tokio::task::JoinSet::new(); + + // ratelimit to 1k files or any higher max batch size + let limit = Arc::new(tokio::sync::Semaphore::new(1000.max(max_batch_size))); + for (timeline, batch) in batched { let tenant_id = timeline.tenant_id; let timeline_id = timeline.timeline_id; - let batch_size = batch.len(); + let batch_size = + u32::try_from(batch.len()).expect("batch size limited to u32::MAX during partitioning"); + + // I dislike naming of `available_permits` but it means current total amount of permits + // because permits can be added + assert!(batch_size as usize <= limit.available_permits()); debug!(%timeline_id, "evicting batch for timeline"); - async { - let results = timeline.evict_layers(storage, &batch, cancel.clone()).await; + let evict = { + let limit = limit.clone(); + let cancel = cancel.clone(); + async move { + let mut evicted_bytes = 0; + let mut evictions_failed = LayerCount::default(); - match results { - Err(e) => { - warn!("failed to evict batch: {:#}", e); - } - Ok(results) => { - assert_eq!(results.len(), batch.len()); - for (result, layer) in results.into_iter().zip(batch.iter()) { - let file_size = layer.layer_desc().file_size; - match result { - Some(Ok(())) => { - usage_assumed.add_available_bytes(file_size); - } - Some(Err(EvictionError::CannotEvictRemoteLayer)) => { - unreachable!("get_local_layers_for_disk_usage_eviction finds only local layers") - } - Some(Err(EvictionError::FileNotFound)) => { - evictions_failed.file_sizes += file_size; - evictions_failed.count += 1; - } - Some(Err( - e @ EvictionError::LayerNotFound(_) - | e @ EvictionError::StatFailed(_), - )) => { - let e = utils::error::report_compact_sources(&e); - warn!(%layer, "failed to evict layer: {e}"); - evictions_failed.file_sizes += file_size; - evictions_failed.count += 1; - } - None => { - assert!(cancel.is_cancelled()); - return; + let Ok(_permit) = limit.acquire_many_owned(batch_size).await else { + // assume semaphore closing means cancelled + return (evicted_bytes, evictions_failed); + }; + + let results = timeline.evict_layers(&batch, &cancel).await; + + match results { + Ok(results) => { + assert_eq!(results.len(), batch.len()); + for (result, layer) in results.into_iter().zip(batch.iter()) { + let file_size = layer.layer_desc().file_size; + match result { + Some(Ok(())) => { + evicted_bytes += file_size; + } + Some(Err(EvictionError::NotFound | EvictionError::Downloaded)) => { + evictions_failed.file_sizes += file_size; + evictions_failed.count += 1; + } + None => { + assert!(cancel.is_cancelled()); + } } } } + Err(e) => { + warn!("failed to evict batch: {:#}", e); + } } + (evicted_bytes, evictions_failed) } } - .instrument(tracing::info_span!("evict_batch", %tenant_id, %timeline_id, batch_size)) - .await; + .instrument(tracing::info_span!("evict_batch", %tenant_id, %timeline_id, batch_size)); - if cancel.is_cancelled() { + js.spawn(evict); + + // spwaning multiple thousands of these is essentially blocking, so give already spawned a + // chance of making progress + tokio::task::yield_now().await; + } + + let join_all = async move { + // After the evictions, `usage_assumed` is the post-eviction usage, + // according to internal accounting. + let mut usage_assumed = usage_pre; + let mut evictions_failed = LayerCount::default(); + + while let Some(res) = js.join_next().await { + match res { + Ok((evicted_bytes, failed)) => { + usage_assumed.add_available_bytes(evicted_bytes); + evictions_failed.file_sizes += failed.file_sizes; + evictions_failed.count += failed.count; + } + Err(je) if je.is_cancelled() => unreachable!("not used"), + Err(je) if je.is_panic() => { /* already logged */ } + Err(je) => tracing::error!("unknown JoinError: {je:?}"), + } + } + (usage_assumed, evictions_failed) + }; + + let (usage_assumed, evictions_failed) = tokio::select! { + tuple = join_all => { tuple }, + _ = cancel.cancelled() => { + // close the semaphore to stop any pending acquires + limit.close(); return Ok(IterationOutcome::Cancelled); } - } + }; Ok(IterationOutcome::Finished(IterationOutcomeFinished { before: usage_pre, @@ -441,7 +489,7 @@ pub async fn disk_usage_eviction_task_iteration_impl( #[derive(Clone)] struct EvictionCandidate { timeline: Arc, - layer: Arc, + layer: Arc, last_activity_ts: SystemTime, } diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 917525dfff..71d7ec7bcc 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -410,6 +410,12 @@ pub(crate) struct LayerE { /// for it. version: AtomicUsize, have_remote_client: bool, + + /// Allow subscribing to when the layer actually gets evicted. + /// + /// This might never come unless eviction called periodically. + #[cfg(test)] + evicted: tokio::sync::Notify, } impl std::fmt::Display for LayerE { @@ -418,6 +424,12 @@ impl std::fmt::Display for LayerE { } } +impl std::fmt::Debug for LayerE { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self) + } +} + impl AsLayerDesc for LayerE { fn layer_desc(&self) -> &PersistentLayerDesc { &self.desc @@ -491,6 +503,8 @@ impl LayerE { wanted_evicted: AtomicBool::new(false), inner: Default::default(), version: AtomicUsize::new(0), + #[cfg(test)] + evicted: tokio::sync::Notify::default(), } } @@ -522,6 +536,8 @@ impl LayerE { wanted_evicted: AtomicBool::new(false), inner: tokio::sync::Mutex::new(Some(ResidentOrWantedEvicted::Resident(inner))), version: AtomicUsize::new(0), + #[cfg(test)] + evicted: tokio::sync::Notify::default(), } }); @@ -577,6 +593,15 @@ impl LayerE { } } + #[cfg(test)] + pub(crate) fn wait_evicted(&self) -> impl std::future::Future + '_ { + // for this to be actually useful, we must be first able to check some status, otherwise + // we could wait here for next eviction. + // + // states => (resident wanted_evicted evicted|wanted_evicted evicted resident)* wanted_garbage_collected? dropped + self.evicted.notified() + } + /// Delete the layer file when the `self` gets dropped, also schedule a remote index upload /// then perhaps. pub(crate) fn garbage_collect(&self) { @@ -957,7 +982,12 @@ impl LayerE { } }); - match capture_mtime_and_delete.await { + let res = capture_mtime_and_delete.await; + + #[cfg(test)] + this.evicted.notify_waiters(); + + match res { Ok(Ok(local_layer_mtime)) => { let duration = std::time::SystemTime::now().duration_since(local_layer_mtime); @@ -1004,6 +1034,7 @@ impl LayerE { } } +#[derive(Debug, PartialEq)] pub(crate) enum NeedsDownload { NotFound, NotFile, @@ -1060,8 +1091,8 @@ impl std::fmt::Debug for ResidentLayer { } impl ResidentLayer { - pub(crate) fn local_path(&self) -> &std::path::Path { - &self.owner.path + pub(crate) fn drop_eviction_guard(self) -> Arc { + self.into() } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 39e36a67d4..d8aa83e63b 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1043,14 +1043,13 @@ impl Timeline { let Some(layer) = self.find_layer(layer_file_name).await else { return Ok(None); }; - let Some(remote_layer) = layer.downcast_remote_layer() else { - return Ok(Some(false)); - }; + if self.remote_client.is_none() { return Ok(Some(false)); } - self.download_remote_layer(remote_layer).await?; + layer.guard_against_eviction(true).await?; + Ok(Some(true)) } @@ -1060,6 +1059,11 @@ impl Timeline { let Some(local_layer) = self.find_layer(layer_file_name).await else { return Ok(None); }; + + let Ok(local_layer) = local_layer.guard_against_eviction(false).await else { return Ok(Some(false)); }; + + let local_layer: Arc = local_layer.into(); + let remote_client = self .remote_client .as_ref() @@ -1067,7 +1071,7 @@ impl Timeline { let cancel = CancellationToken::new(); let results = self - .evict_layer_batch(remote_client, &[local_layer], cancel) + .evict_layer_batch(remote_client, &[local_layer], &cancel) .await?; assert_eq!(results.len(), 1); let result: Option> = results.into_iter().next().unwrap(); @@ -1079,31 +1083,22 @@ impl Timeline { } /// Evict a batch of layers. - /// - /// GenericRemoteStorage reference is required as a (witness)[witness_article] for "remote storage is configured." - /// - /// [witness_article]: https://willcrichton.net/rust-api-type-patterns/witnesses.html pub(crate) async fn evict_layers( &self, - _: &GenericRemoteStorage, - layers_to_evict: &[Arc], - cancel: CancellationToken, + layers_to_evict: &[Arc], + cancel: &CancellationToken, ) -> anyhow::Result>>> { - let remote_client = self.remote_client.clone().expect( - "GenericRemoteStorage is configured, so timeline must have RemoteTimelineClient", - ); + let remote_client = self + .remote_client + .as_ref() + .context("timeline must have RemoteTimelineClient")?; - self.evict_layer_batch(&remote_client, layers_to_evict, cancel) + self.evict_layer_batch(remote_client, layers_to_evict, &cancel) .await } /// Evict multiple layers at once, continuing through errors. /// - /// Try to evict the given `layers_to_evict` by - /// - /// 1. Replacing the given layer object in the layer map with a corresponding [`RemoteLayer`] object. - /// 2. Deleting the now unreferenced layer file from disk. - /// /// The `remote_client` should be this timeline's `self.remote_client`. /// We make the caller provide it so that they are responsible for handling the case /// where someone wants to evict the layer but no remote storage is configured. @@ -1112,17 +1107,15 @@ impl Timeline { /// If `Err()` is returned, no eviction was attempted. /// Each position of `Ok(results)` corresponds to the layer in `layers_to_evict`. /// Meaning of each `result[i]`: - /// - `Some(Err(...))` if layer replacement failed for an unexpected reason - /// - `Some(Ok(true))` if everything went well. - /// - `Some(Ok(false))` if there was an expected reason why the layer could not be replaced, e.g.: - /// - evictee was not yet downloaded + /// - `Some(Err(...))` if layer replacement failed for some reason /// - replacement failed for an expectable reason (e.g., layer removed by GC before we grabbed all locks) + /// - `Some(Ok(()))` if everything went well. /// - `None` if no eviction attempt was made for the layer because `cancel.is_cancelled() == true`. async fn evict_layer_batch( &self, remote_client: &Arc, - layers_to_evict: &[Arc], - cancel: CancellationToken, + layers_to_evict: &[Arc], + cancel: &CancellationToken, ) -> anyhow::Result>>> { // ensure that the layers have finished uploading // (don't hold the layer_removal_cs while we do it, we're not removing anything yet) @@ -1132,7 +1125,7 @@ impl Timeline { .context("wait for layer upload ops to complete")?; // now lock out layer removal (compaction, gc, timeline deletion) - let layer_removal_guard = self.layer_removal_cs.lock().await; + let _layer_removal_guard = self.layer_removal_cs.lock().await; { // to avoid racing with detach and delete_timeline @@ -1144,145 +1137,49 @@ impl Timeline { } // start the batch update - let mut guard = self.layers.write().await; let mut results = Vec::with_capacity(layers_to_evict.len()); - - for l in layers_to_evict.iter() { - let res = if cancel.is_cancelled() { - None - } else { - Some(self.evict_layer_batch_impl(&layer_removal_guard, l, &mut guard)) - }; - results.push(res); + for _ in 0..layers_to_evict.len() { + results.push(None); } - // commit the updates & release locks - drop_wlock(guard); - drop(layer_removal_guard); + let mut js = tokio::task::JoinSet::new(); + + for (i, l) in layers_to_evict.into_iter().enumerate() { + js.spawn({ + let l = l.to_owned(); + async move { (i, l.evict_and_wait().await) } + }); + } + + let join = async { + while let Some(next) = js.join_next().await { + match next { + Ok((i, res)) => results[i] = Some(res), + Err(je) if je.is_cancelled() => unreachable!("not used"), + Err(je) if je.is_panic() => { /* already logged */ } + Err(je) => tracing::error!("unknown JoinError: {je:?}"), + } + } + }; + + tokio::select! { + _ = cancel.cancelled() => {}, + _ = join => {} + } assert_eq!(results.len(), layers_to_evict.len()); Ok(results) } - - fn evict_layer_batch_impl( - &self, - _layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>, - local_layer: &Arc, - layer_mgr: &mut LayerManager, - ) -> Result<(), EvictionError> { - if local_layer.is_remote_layer() { - return Err(EvictionError::CannotEvictRemoteLayer); - } - - let layer_file_size = local_layer.layer_desc().file_size; - - let local_layer_mtime = local_layer - .local_path() - .expect("local layer should have a local path") - .metadata() - // when the eviction fails because we have already deleted the layer in compaction for - // example, a NotFound error bubbles up from here. - .map_err(|e| { - if e.kind() == std::io::ErrorKind::NotFound { - EvictionError::FileNotFound - } else { - EvictionError::StatFailed(e) - } - })? - .modified() - .map_err(EvictionError::StatFailed)?; - - let local_layer_residence_duration = - match SystemTime::now().duration_since(local_layer_mtime) { - Err(e) => { - warn!(layer = %local_layer, "layer mtime is in the future: {}", e); - None - } - Ok(delta) => Some(delta), - }; - - let layer_metadata = LayerFileMetadata::new(layer_file_size); - - let new_remote_layer = Arc::new(match local_layer.filename() { - LayerFileName::Image(image_name) => RemoteLayer::new_img( - self.tenant_id, - self.timeline_id, - &image_name, - &layer_metadata, - local_layer - .access_stats() - .clone_for_residence_change(LayerResidenceStatus::Evicted), - ), - LayerFileName::Delta(delta_name) => RemoteLayer::new_delta( - self.tenant_id, - self.timeline_id, - &delta_name, - &layer_metadata, - local_layer - .access_stats() - .clone_for_residence_change(LayerResidenceStatus::Evicted), - ), - }); - - assert_eq!(local_layer.layer_desc(), new_remote_layer.layer_desc()); - - layer_mgr - .replace_and_verify(local_layer.clone(), new_remote_layer) - .map_err(EvictionError::LayerNotFound)?; - - if let Err(e) = local_layer.delete_resident_layer_file() { - // this should never happen, because of layer_removal_cs usage and above stat - // access for mtime - error!("failed to remove layer file on evict after replacement: {e:#?}"); - } - // Always decrement the physical size gauge, even if we failed to delete the file. - // Rationale: we already replaced the layer with a remote layer in the layer map, - // and any subsequent download_remote_layer will - // 1. overwrite the file on disk and - // 2. add the downloaded size to the resident size gauge. - // - // If there is no re-download, and we restart the pageserver, then load_layer_map - // will treat the file as a local layer again, count it towards resident size, - // and it'll be like the layer removal never happened. - // The bump in resident size is perhaps unexpected but overall a robust behavior. - self.metrics - .resident_physical_size_gauge - .sub(layer_file_size); - - self.metrics.evictions.inc(); - - if let Some(delta) = local_layer_residence_duration { - self.metrics - .evictions_with_low_residence_duration - .read() - .unwrap() - .observe(delta); - info!(layer=%local_layer, residence_millis=delta.as_millis(), "evicted layer after known residence period"); - } else { - info!(layer=%local_layer, "evicted layer after unknown residence period"); - } - - Ok(()) - } } #[derive(Debug, thiserror::Error)] pub(crate) enum EvictionError { - #[error("cannot evict a remote layer")] - CannotEvictRemoteLayer, - /// Most likely the to-be evicted layer has been deleted by compaction or gc which use the same - /// locks, so they got to execute before the eviction. - #[error("file backing the layer has been removed already")] - FileNotFound, - #[error("stat failed")] - StatFailed(#[source] std::io::Error), - /// In practice, this can be a number of things, but lets assume it means only this. - /// - /// This case includes situations such as the Layer was evicted and redownloaded in between, - /// because the file existed before an replacement attempt was made but now the Layers are - /// different objects in memory. - #[error("layer was no longer part of LayerMap")] - LayerNotFound(#[source] anyhow::Error), + #[error("layer was already evicted")] + NotFound, + + /// Evictions must always lose to downloads in races, and this time it happened. + #[error("layer was downloaded instead")] + Downloaded, } /// Number of times we will compute partition within a checkpoint distance. @@ -2073,7 +1970,7 @@ impl Timeline { } } - async fn find_layer(&self, layer_file_name: &str) -> Option> { + async fn find_layer(&self, layer_file_name: &str) -> Option> { let guard = self.layers.read().await; for historic_layer in guard.layer_map().iter_historic_layers() { let historic_layer_name = historic_layer.filename().file_name(); @@ -4465,15 +4362,15 @@ impl Timeline { } } -pub struct DiskUsageEvictionInfo { +pub(crate) struct DiskUsageEvictionInfo { /// Timeline's largest layer (remote or resident) pub max_layer_size: Option, /// Timeline's resident layers pub resident_layers: Vec, } -pub struct LocalLayerInfoForDiskUsageEviction { - pub layer: Arc, +pub(crate) struct LocalLayerInfoForDiskUsageEviction { + pub layer: Arc, pub last_activity_ts: SystemTime, } @@ -4483,8 +4380,14 @@ impl std::fmt::Debug for LocalLayerInfoForDiskUsageEviction { // having to allocate a string to this is bad, but it will rarely be formatted let ts = chrono::DateTime::::from(self.last_activity_ts); let ts = ts.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true); + struct DisplayIsDebug<'a, T>(&'a T); + impl<'a, T: std::fmt::Display> std::fmt::Debug for DisplayIsDebug<'a, T> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } + } f.debug_struct("LocalLayerInfoForDiskUsageEviction") - .field("layer", &self.layer) + .field("layer", &DisplayIsDebug(&self.layer)) .field("last_activity", &ts) .finish() } @@ -4511,9 +4414,7 @@ impl Timeline { let l = guard.get_from_desc(&l); - if l.is_remote_layer() { - continue; - } + let Ok(l) = l.guard_against_eviction(false).await else { continue; }; let last_activity_ts = l.access_stats().latest_activity().unwrap_or_else(|| { // We only use this fallback if there's an implementation error. @@ -4523,7 +4424,8 @@ impl Timeline { }); resident_layers.push(LocalLayerInfoForDiskUsageEviction { - layer: l, + // we explicitly don't want to keep this layer downloaded + layer: l.drop_eviction_guard(), last_activity_ts, }); } @@ -4675,9 +4577,7 @@ mod tests { use utils::{id::TimelineId, lsn::Lsn}; - use crate::tenant::{harness::TenantHarness, storage_layer::PersistentLayer}; - - use super::{EvictionError, Timeline}; + use crate::tenant::{harness::TenantHarness, storage_layer::LayerE, Timeline}; #[tokio::test] async fn two_layer_eviction_attempts_at_the_same_time() { @@ -4711,22 +4611,25 @@ mod tests { .expect("just configured this"); let layer = find_some_layer(&timeline).await; + let layer = layer.guard_against_eviction(false).await.unwrap(); + let layer = layer.drop_eviction_guard(); let cancel = tokio_util::sync::CancellationToken::new(); let batch = [layer]; let first = { - let cancel = cancel.clone(); + let cancel = cancel.child_token(); async { + let cancel = cancel; timeline - .evict_layer_batch(&rc, &batch, cancel) + .evict_layer_batch(&rc, &batch, &cancel) .await .unwrap() } }; let second = async { timeline - .evict_layer_batch(&rc, &batch, cancel) + .evict_layer_batch(&rc, &batch, &cancel) .await .unwrap() }; @@ -4735,88 +4638,20 @@ mod tests { let (first, second) = (only_one(first), only_one(second)); + assert_eq!(batch[0].needs_download_blocking().unwrap(), None); + + let layer: Arc = batch[0].clone(); + let mut evicted = std::pin::pin!(layer.wait_evicted()); + assert_eq!(futures::future::poll_immediate(&mut evicted).await, None); + + // both seemingly succeed, but only one will actually evict match (first, second) { - (Ok(()), Err(EvictionError::FileNotFound)) - | (Err(EvictionError::FileNotFound), Ok(())) => { - // one of the evictions gets to do it, - // other one gets FileNotFound. all is good. - } + (Ok(()), Ok(())) => {} other => unreachable!("unexpected {:?}", other), } - } - #[tokio::test] - async fn layer_eviction_aba_fails() { - let harness = TenantHarness::create("layer_eviction_aba_fails").unwrap(); - - let remote_storage = { - // this is never used for anything, because of how the create_test_timeline works, but - // it is with us in spirit and a Some. - use remote_storage::{GenericRemoteStorage, RemoteStorageConfig, RemoteStorageKind}; - let path = harness.conf.workdir.join("localfs"); - std::fs::create_dir_all(&path).unwrap(); - let config = RemoteStorageConfig { - max_concurrent_syncs: std::num::NonZeroUsize::new(2_000_000).unwrap(), - max_sync_errors: std::num::NonZeroU32::new(3_000_000).unwrap(), - storage: RemoteStorageKind::LocalFs(path), - }; - GenericRemoteStorage::from_config(&config).unwrap() - }; - - let ctx = any_context(); - let tenant = harness.try_load(&ctx, Some(remote_storage)).await.unwrap(); - let timeline = tenant - .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx) - .await - .unwrap(); - - let _e = tracing::info_span!("foobar", tenant_id = %tenant.tenant_id, timeline_id = %timeline.timeline_id).entered(); - - let rc = timeline.remote_client.clone().unwrap(); - - // TenantHarness allows uploads to happen given GenericRemoteStorage is configured - let layer = find_some_layer(&timeline).await; - - let cancel = tokio_util::sync::CancellationToken::new(); - let batch = [layer]; - - let first = { - let cancel = cancel.clone(); - async { - timeline - .evict_layer_batch(&rc, &batch, cancel) - .await - .unwrap() - } - }; - - // lets imagine this is stuck somehow, still referencing the original `Arc` - let second = { - let cancel = cancel.clone(); - async { - timeline - .evict_layer_batch(&rc, &batch, cancel) - .await - .unwrap() - } - }; - - // while it's stuck, we evict and end up redownloading it - only_one(first.await).expect("eviction succeeded"); - - let layer = find_some_layer(&timeline).await; - let layer = layer.downcast_remote_layer().unwrap(); - timeline.download_remote_layer(layer).await.unwrap(); - - let res = only_one(second.await); - - assert!( - matches!(res, Err(EvictionError::LayerNotFound(_))), - "{res:?}" - ); - - // no more specific asserting, outside of preconds this is the only valid replacement - // failure + // after eviction has been requested, we will eventually evict + evicted.await; } fn any_context() -> crate::context::RequestContext { @@ -4833,7 +4668,7 @@ mod tests { .expect("no cancellation") } - async fn find_some_layer(timeline: &Timeline) -> Arc { + async fn find_some_layer(timeline: &Timeline) -> Arc { let layers = timeline.layers.read().await; let desc = layers .layer_map() diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 3e407dda57..377aad1b2e 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -29,7 +29,6 @@ use crate::{ task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}, tenant::{ config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold}, - storage_layer::PersistentLayer, timeline::EvictionError, LogicalSizeCalculationCause, Tenant, }, @@ -194,15 +193,16 @@ impl Timeline { // NB: all the checks can be invalidated as soon as we release the layer map lock. // We don't want to hold the layer map lock during eviction. // So, we just need to deal with this. - let candidates: Vec> = { + let candidates: Vec<_> = { let guard = self.layers.read().await; let layers = guard.layer_map(); let mut candidates = Vec::new(); for hist_layer in layers.iter_historic_layers() { let hist_layer = guard.get_from_desc(&hist_layer); - if hist_layer.is_remote_layer() { - continue; - } + + // funny: this is the best way to get local layers is to lock them into + // memory for the duration of eviction + let Ok(guard) = hist_layer.guard_against_eviction(false).await else { continue; }; let last_activity_ts = hist_layer.access_stats().latest_activity().unwrap_or_else(|| { // We only use this fallback if there's an implementation error. @@ -233,7 +233,7 @@ impl Timeline { } }; if no_activity_for > p.threshold { - candidates.push(hist_layer) + candidates.push(guard.drop_eviction_guard()) } } candidates @@ -252,7 +252,7 @@ impl Timeline { }; let results = match self - .evict_layer_batch(remote_client, &candidates[..], cancel.clone()) + .evict_layer_batch(remote_client, &candidates, cancel) .await { Err(pre_err) => { @@ -263,7 +263,8 @@ impl Timeline { Ok(results) => results, }; assert_eq!(results.len(), candidates.len()); - for (l, result) in candidates.iter().zip(results) { + drop(candidates); + for result in results { match result { None => { stats.skipped_for_shutdown += 1; @@ -271,20 +272,10 @@ impl Timeline { Some(Ok(())) => { stats.evicted += 1; } - Some(Err(EvictionError::CannotEvictRemoteLayer)) => { - stats.not_evictable += 1; - } - Some(Err(EvictionError::FileNotFound)) => { + Some(Err(EvictionError::NotFound | EvictionError::Downloaded)) => { // compaction/gc removed the file while we were waiting on layer_removal_cs stats.not_evictable += 1; } - Some(Err( - e @ EvictionError::LayerNotFound(_) | e @ EvictionError::StatFailed(_), - )) => { - let e = utils::error::report_compact_sources(&e); - warn!(layer = %l, "failed to evict layer: {e}"); - stats.not_evictable += 1; - } } } if stats.candidates == stats.not_evictable {