From bf4e708646388ff1aee8eafe78c108693a53d5a9 Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 16 Jan 2024 10:29:26 +0000 Subject: [PATCH] pageserver: eviction for secondary mode tenants (#6225) Follows #6123 Closes: https://github.com/neondatabase/neon/issues/5342 The approach here is to avoid using `Layer` from secondary tenants, and instead make the eviction types (e.g. `EvictionCandidate`) have a variant that carries a Layer for attached tenants, and a different variant for secondary tenants. Other changes: - EvictionCandidate no longer carries a `Timeline`: this was only used for providing a witness reference to remote timeline client. - The types for returning eviction candidates are all in disk_usage_eviction_task.rs now, whereas some of them were in timeline.rs before. - The EvictionCandidate type replaces LocalLayerInfoForDiskUsageEviction type, which was basically the same thing. --- pageserver/src/bin/pageserver.rs | 1 + pageserver/src/disk_usage_eviction_task.rs | 246 +++++++++++++----- pageserver/src/http/routes.rs | 5 +- pageserver/src/tenant/secondary.rs | 77 +++++- pageserver/src/tenant/secondary/downloader.rs | 43 +++ pageserver/src/tenant/storage_layer/layer.rs | 16 +- pageserver/src/tenant/timeline.rs | 86 ++---- .../src/tenant/timeline/eviction_task.rs | 14 +- .../regress/test_disk_usage_eviction.py | 202 +++++++++++--- 9 files changed, 500 insertions(+), 190 deletions(-) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 621ad050f4..15e3359c06 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -527,6 +527,7 @@ fn start_pageserver( conf, remote_storage.clone(), disk_usage_eviction_state.clone(), + tenant_manager.clone(), background_jobs_barrier.clone(), )?; } diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index 7d74da1196..4cb20a1bb1 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -47,21 +47,24 @@ use std::{ }; use anyhow::Context; -use camino::Utf8Path; +use pageserver_api::shard::TenantShardId; use remote_storage::GenericRemoteStorage; use serde::{Deserialize, Serialize}; use tokio::time::Instant; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, instrument, warn, Instrument}; -use utils::completion; use utils::serde_percent::Percent; +use utils::{completion, id::TimelineId}; use crate::{ config::PageServerConf, task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}, tenant::{ self, - storage_layer::{AsLayerDesc, EvictionError, Layer}, + mgr::TenantManager, + remote_timeline_client::LayerFileMetadata, + secondary::SecondaryTenant, + storage_layer::{AsLayerDesc, EvictionError, Layer, LayerFileName}, Timeline, }, }; @@ -125,6 +128,7 @@ pub fn launch_disk_usage_global_eviction_task( conf: &'static PageServerConf, storage: GenericRemoteStorage, state: Arc, + tenant_manager: Arc, background_jobs_barrier: completion::Barrier, ) -> anyhow::Result<()> { let Some(task_config) = &conf.disk_usage_based_eviction else { @@ -150,8 +154,7 @@ pub fn launch_disk_usage_global_eviction_task( _ = background_jobs_barrier.wait() => { } }; - disk_usage_eviction_task(&state, task_config, &storage, &conf.tenants_path(), cancel) - .await; + disk_usage_eviction_task(&state, task_config, &storage, tenant_manager, cancel).await; Ok(()) }, ); @@ -164,7 +167,7 @@ async fn disk_usage_eviction_task( state: &State, task_config: &DiskUsageEvictionTaskConfig, storage: &GenericRemoteStorage, - tenants_dir: &Utf8Path, + tenant_manager: Arc, cancel: CancellationToken, ) { scopeguard::defer! { @@ -191,7 +194,7 @@ async fn disk_usage_eviction_task( state, task_config, storage, - tenants_dir, + &tenant_manager, &cancel, ) .await; @@ -226,15 +229,17 @@ async fn disk_usage_eviction_task_iteration( state: &State, task_config: &DiskUsageEvictionTaskConfig, storage: &GenericRemoteStorage, - tenants_dir: &Utf8Path, + tenant_manager: &Arc, cancel: &CancellationToken, ) -> anyhow::Result<()> { - let usage_pre = filesystem_level_usage::get(tenants_dir, task_config) + let tenants_dir = tenant_manager.get_conf().tenants_path(); + let usage_pre = filesystem_level_usage::get(&tenants_dir, task_config) .context("get filesystem-level disk usage before evictions")?; let res = disk_usage_eviction_task_iteration_impl( state, storage, usage_pre, + tenant_manager, task_config.eviction_order, cancel, ) @@ -248,7 +253,7 @@ async fn disk_usage_eviction_task_iteration( } IterationOutcome::Finished(outcome) => { // Verify with statvfs whether we made any real progress - let after = filesystem_level_usage::get(tenants_dir, task_config) + let after = filesystem_level_usage::get(&tenants_dir, task_config) // It's quite unlikely to hit the error here. Keep the code simple and bail out. .context("get filesystem-level disk usage after evictions")?; @@ -324,6 +329,7 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl( state: &State, _storage: &GenericRemoteStorage, usage_pre: U, + tenant_manager: &Arc, eviction_order: EvictionOrder, cancel: &CancellationToken, ) -> anyhow::Result> { @@ -344,29 +350,29 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl( "running disk usage based eviction due to pressure" ); - let candidates = match collect_eviction_candidates(eviction_order, cancel).await? { - EvictionCandidates::Cancelled => { - return Ok(IterationOutcome::Cancelled); - } - EvictionCandidates::Finished(partitioned) => partitioned, - }; + let candidates = + match collect_eviction_candidates(tenant_manager, eviction_order, cancel).await? { + EvictionCandidates::Cancelled => { + return Ok(IterationOutcome::Cancelled); + } + EvictionCandidates::Finished(partitioned) => partitioned, + }; // Debug-log the list of candidates let now = SystemTime::now(); for (i, (partition, candidate)) in candidates.iter().enumerate() { let nth = i + 1; - let desc = candidate.layer.layer_desc(); let total_candidates = candidates.len(); - let size = desc.file_size; + let size = candidate.layer.get_file_size(); let rel = candidate.relative_last_activity; debug!( "cand {nth}/{total_candidates}: size={size}, rel_last_activity={rel}, no_access_for={}us, partition={partition:?}, {}/{}/{}", now.duration_since(candidate.last_activity_ts) .unwrap() .as_micros(), - desc.tenant_shard_id, - desc.timeline_id, - candidate.layer, + candidate.layer.get_tenant_shard_id(), + candidate.layer.get_timeline_id(), + candidate.layer.get_name(), ); } @@ -398,7 +404,7 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl( warned = Some(usage_planned); } - usage_planned.add_available_bytes(candidate.layer.layer_desc().file_size); + usage_planned.add_available_bytes(candidate.layer.get_file_size()); evicted_amount += 1; } @@ -463,19 +469,30 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl( continue; }; - js.spawn(async move { - let rtc = candidate.timeline.remote_client.as_ref().expect( - "holding the witness, all timelines must have a remote timeline client", - ); - let file_size = candidate.layer.layer_desc().file_size; - candidate - .layer - .evict_and_wait(rtc) - .await - .map(|()| file_size) - .map_err(|e| (file_size, e)) - }); + match candidate.layer { + EvictionLayer::Attached(layer) => { + let file_size = layer.layer_desc().file_size; + js.spawn(async move { + layer + .evict_and_wait() + .await + .map(|()| file_size) + .map_err(|e| (file_size, e)) + }); + } + EvictionLayer::Secondary(layer) => { + let file_size = layer.metadata.file_size(); + let tenant_manager = tenant_manager.clone(); + js.spawn(async move { + layer + .secondary_tenant + .evict_layer(tenant_manager.get_conf(), layer.timeline_id, layer.name) + .await; + Ok(file_size) + }); + } + } tokio::task::yield_now().await; } @@ -502,11 +519,100 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl( } #[derive(Clone)] -struct EvictionCandidate { - timeline: Arc, - layer: Layer, - last_activity_ts: SystemTime, - relative_last_activity: finite_f32::FiniteF32, +pub(crate) struct EvictionSecondaryLayer { + pub(crate) secondary_tenant: Arc, + pub(crate) timeline_id: TimelineId, + pub(crate) name: LayerFileName, + pub(crate) metadata: LayerFileMetadata, +} + +/// Full [`Layer`] objects are specific to tenants in attached mode. This type is a layer +/// of indirection to store either a `Layer`, or a reference to a secondary tenant and a layer name. +#[derive(Clone)] +pub(crate) enum EvictionLayer { + Attached(Layer), + #[allow(dead_code)] + Secondary(EvictionSecondaryLayer), +} + +impl From for EvictionLayer { + fn from(value: Layer) -> Self { + Self::Attached(value) + } +} + +impl EvictionLayer { + pub(crate) fn get_tenant_shard_id(&self) -> &TenantShardId { + match self { + Self::Attached(l) => &l.layer_desc().tenant_shard_id, + Self::Secondary(sl) => sl.secondary_tenant.get_tenant_shard_id(), + } + } + + pub(crate) fn get_timeline_id(&self) -> &TimelineId { + match self { + Self::Attached(l) => &l.layer_desc().timeline_id, + Self::Secondary(sl) => &sl.timeline_id, + } + } + + pub(crate) fn get_name(&self) -> LayerFileName { + match self { + Self::Attached(l) => l.layer_desc().filename(), + Self::Secondary(sl) => sl.name.clone(), + } + } + + pub(crate) fn get_file_size(&self) -> u64 { + match self { + Self::Attached(l) => l.layer_desc().file_size, + Self::Secondary(sl) => sl.metadata.file_size(), + } + } +} + +#[derive(Clone)] +pub(crate) struct EvictionCandidate { + pub(crate) layer: EvictionLayer, + pub(crate) last_activity_ts: SystemTime, + pub(crate) relative_last_activity: finite_f32::FiniteF32, +} + +impl std::fmt::Display for EvictionLayer { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + Self::Attached(l) => l.fmt(f), + Self::Secondary(sl) => { + write!(f, "{}/{}", sl.timeline_id, sl.name) + } + } + } +} + +pub(crate) struct DiskUsageEvictionInfo { + /// Timeline's largest layer (remote or resident) + pub max_layer_size: Option, + /// Timeline's resident layers + pub resident_layers: Vec, +} + +impl std::fmt::Debug for EvictionCandidate { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // format the tv_sec, tv_nsec into rfc3339 in case someone is looking at it + // having to allocate a string to this is bad, but it will rarely be formatted + let ts = chrono::DateTime::::from(self.last_activity_ts); + let ts = ts.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true); + struct DisplayIsDebug<'a, T>(&'a T); + impl<'a, T: std::fmt::Display> std::fmt::Debug for DisplayIsDebug<'a, T> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } + } + f.debug_struct("LocalLayerInfoForDiskUsageEviction") + .field("layer", &DisplayIsDebug(&self.layer)) + .field("last_activity", &ts) + .finish() + } } #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] @@ -623,6 +729,7 @@ enum EvictionCandidates { /// - tenant B 1 layer /// - tenant C 8 layers async fn collect_eviction_candidates( + tenant_manager: &Arc, eviction_order: EvictionOrder, cancel: &CancellationToken, ) -> anyhow::Result { @@ -631,13 +738,16 @@ async fn collect_eviction_candidates( .await .context("get list of tenants")?; + // TODO: avoid listing every layer in every tenant: this loop can block the executor, + // and the resulting data structure can be huge. + // (https://github.com/neondatabase/neon/issues/6224) let mut candidates = Vec::new(); - for (tenant_id, _state, _gen) in &tenants { + for (tenant_id, _state, _gen) in tenants { if cancel.is_cancelled() { return Ok(EvictionCandidates::Cancelled); } - let tenant = match tenant::mgr::get_tenant(*tenant_id, true) { + let tenant = match tenant::mgr::get_tenant(tenant_id, true) { Ok(tenant) => tenant, Err(e) => { // this can happen if tenant has lifecycle transition after we fetched it @@ -665,11 +775,7 @@ async fn collect_eviction_candidates( } let info = tl.get_local_layers_for_disk_usage_eviction().await; debug!(tenant_id=%tl.tenant_shard_id.tenant_id, shard_id=%tl.tenant_shard_id.shard_slug(), timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len()); - tenant_candidates.extend( - info.resident_layers - .into_iter() - .map(|layer_infos| (tl.clone(), layer_infos)), - ); + tenant_candidates.extend(info.resident_layers.into_iter()); max_layer_size = max_layer_size.max(info.max_layer_size.unwrap_or(0)); if cancel.is_cancelled() { @@ -707,7 +813,7 @@ async fn collect_eviction_candidates( // Sort layers most-recently-used first, then partition by // cumsum above/below min_resident_size. tenant_candidates - .sort_unstable_by_key(|(_, layer_info)| std::cmp::Reverse(layer_info.last_activity_ts)); + .sort_unstable_by_key(|layer_info| std::cmp::Reverse(layer_info.last_activity_ts)); let mut cumsum: i128 = 0; // keeping the -1 or not decides if every tenant should lose their least recently accessed @@ -741,12 +847,10 @@ async fn collect_eviction_candidates( .unwrap_or(1); let divider = total as f32; - for (i, (timeline, layer_info)) in tenant_candidates.into_iter().enumerate() { - let file_size = layer_info.file_size(); - + for (i, mut candidate) in tenant_candidates.into_iter().enumerate() { // as we iterate this reverse sorted list, the most recently accessed layer will always // be 1.0; this is for us to evict it last. - let relative_last_activity = if matches!( + candidate.relative_last_activity = if matches!( eviction_order, EvictionOrder::RelativeAccessed { .. } ) { @@ -761,22 +865,46 @@ async fn collect_eviction_candidates( finite_f32::FiniteF32::ZERO }; - let candidate = EvictionCandidate { - timeline, - last_activity_ts: layer_info.last_activity_ts, - layer: layer_info.layer, - relative_last_activity, - }; let partition = if cumsum > min_resident_size as i128 { MinResidentSizePartition::Above } else { MinResidentSizePartition::Below }; + cumsum += i128::from(candidate.layer.get_file_size()); candidates.push((partition, candidate)); - cumsum += i128::from(file_size); } } + // Note: the same tenant ID might be hit twice, if it transitions from attached to + // secondary while we run. That is okay: when we eventually try and run the eviction, + // the `Gate` on the object will ensure that whichever one has already been shut down + // will not delete anything. + + let mut secondary_tenants = Vec::new(); + tenant_manager.foreach_secondary_tenants( + |_tenant_shard_id: &TenantShardId, state: &Arc| { + secondary_tenants.push(state.clone()); + }, + ); + + for secondary_tenant in secondary_tenants { + let mut layer_info = secondary_tenant.get_layers_for_eviction(); + + layer_info + .resident_layers + .sort_unstable_by_key(|layer_info| std::cmp::Reverse(layer_info.last_activity_ts)); + + candidates.extend(layer_info.resident_layers.into_iter().map(|candidate| { + ( + // Secondary locations' layers are always considered above the min resident size, + // i.e. secondary locations are permitted to be trimmed to zero layers if all + // the layers have sufficiently old access times. + MinResidentSizePartition::Above, + candidate, + ) + })); + } + debug_assert!(MinResidentSizePartition::Above < MinResidentSizePartition::Below, "as explained in the function's doc comment, layers that aren't in the tenant's min_resident_size are evicted first"); @@ -821,7 +949,7 @@ impl std::ops::Deref for TimelineKey { } /// A totally ordered f32 subset we can use with sorting functions. -mod finite_f32 { +pub(crate) mod finite_f32 { /// A totally ordered f32 subset we can use with sorting functions. #[derive(Clone, Copy, PartialEq)] diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index c83020135b..07b0671537 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1655,12 +1655,13 @@ async fn disk_usage_eviction_run( ))); }; - let state = state.disk_usage_eviction_state.clone(); + let eviction_state = state.disk_usage_eviction_state.clone(); let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl( - &state, + &eviction_state, storage, usage, + &state.tenant_manager, config.eviction_order, &cancel, ) diff --git a/pageserver/src/tenant/secondary.rs b/pageserver/src/tenant/secondary.rs index 2331447266..4a13814154 100644 --- a/pageserver/src/tenant/secondary.rs +++ b/pageserver/src/tenant/secondary.rs @@ -3,22 +3,31 @@ pub mod heatmap; mod heatmap_uploader; mod scheduler; -use std::sync::Arc; +use std::{sync::Arc, time::SystemTime}; -use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}; +use crate::{ + config::PageServerConf, + disk_usage_eviction_task::DiskUsageEvictionInfo, + task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}, + virtual_file::MaybeFatalIo, +}; use self::{ downloader::{downloader_task, SecondaryDetail}, heatmap_uploader::heatmap_uploader_task, }; -use super::{config::SecondaryLocationConfig, mgr::TenantManager}; +use super::{ + config::SecondaryLocationConfig, mgr::TenantManager, + span::debug_assert_current_span_has_tenant_id, storage_layer::LayerFileName, +}; use pageserver_api::shard::TenantShardId; use remote_storage::GenericRemoteStorage; use tokio_util::sync::CancellationToken; -use utils::{completion::Barrier, sync::gate::Gate}; +use tracing::instrument; +use utils::{completion::Barrier, fs_ext, id::TimelineId, sync::gate::Gate}; enum DownloadCommand { Download(TenantShardId), @@ -107,9 +116,67 @@ impl SecondaryTenant { self.detail.lock().unwrap().config = config.clone(); } - fn get_tenant_shard_id(&self) -> &TenantShardId { + pub(crate) fn get_tenant_shard_id(&self) -> &TenantShardId { &self.tenant_shard_id } + + pub(crate) fn get_layers_for_eviction(self: &Arc) -> DiskUsageEvictionInfo { + self.detail.lock().unwrap().get_layers_for_eviction(self) + } + + #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline_id, name=%name))] + pub(crate) async fn evict_layer( + &self, + conf: &PageServerConf, + timeline_id: TimelineId, + name: LayerFileName, + ) { + debug_assert_current_span_has_tenant_id(); + + let _guard = match self.gate.enter() { + Ok(g) => g, + Err(_) => { + tracing::debug!("Dropping layer evictions, secondary tenant shutting down",); + return; + } + }; + + let now = SystemTime::now(); + + let path = conf + .timeline_path(&self.tenant_shard_id, &timeline_id) + .join(name.file_name()); + + // We tolerate ENOENT, because between planning eviction and executing + // it, the secondary downloader could have seen an updated heatmap that + // resulted in a layer being deleted. + // Other local I/O errors are process-fatal: these should never happen. + tokio::fs::remove_file(path) + .await + .or_else(fs_ext::ignore_not_found) + .fatal_err("Deleting layer during eviction"); + + // Update the timeline's state. This does not have to be synchronized with + // the download process, because: + // - If downloader is racing with us to remove a file (e.g. because it is + // removed from heatmap), then our mutual .remove() operations will both + // succeed. + // - If downloader is racing with us to download the object (this would require + // multiple eviction iterations to race with multiple download iterations), then + // if we remove it from the state, the worst that happens is the downloader + // downloads it again before re-inserting, or we delete the file but it remains + // in the state map (in which case it will be downloaded if this secondary + // tenant transitions to attached and tries to access it) + // + // The important assumption here is that the secondary timeline state does not + // have to 100% match what is on disk, because it's a best-effort warming + // of the cache. + let mut detail = self.detail.lock().unwrap(); + if let Some(timeline_detail) = detail.timelines.get_mut(&timeline_id) { + timeline_detail.on_disk_layers.remove(&name); + timeline_detail.evicted_at.insert(name, now); + } + } } /// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads, diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 2a79c406cf..702c0b1ec1 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -8,6 +8,9 @@ use std::{ use crate::{ config::PageServerConf, + disk_usage_eviction_task::{ + finite_f32, DiskUsageEvictionInfo, EvictionCandidate, EvictionLayer, EvictionSecondaryLayer, + }, metrics::SECONDARY_MODE, tenant::{ config::SecondaryLocationConfig, @@ -142,6 +145,46 @@ impl SecondaryDetail { timelines: HashMap::new(), } } + + pub(super) fn get_layers_for_eviction( + &self, + parent: &Arc, + ) -> DiskUsageEvictionInfo { + let mut result = DiskUsageEvictionInfo { + max_layer_size: None, + resident_layers: Vec::new(), + }; + for (timeline_id, timeline_detail) in &self.timelines { + result + .resident_layers + .extend(timeline_detail.on_disk_layers.iter().map(|(name, ods)| { + EvictionCandidate { + layer: EvictionLayer::Secondary(EvictionSecondaryLayer { + secondary_tenant: parent.clone(), + timeline_id: *timeline_id, + name: name.clone(), + metadata: ods.metadata.clone(), + }), + last_activity_ts: ods.access_time, + relative_last_activity: finite_f32::FiniteF32::ZERO, + } + })); + } + result.max_layer_size = result + .resident_layers + .iter() + .map(|l| l.layer.get_file_size()) + .max(); + + tracing::debug!( + "eviction: secondary tenant {} found {} timelines, {} layers", + parent.get_tenant_shard_id(), + self.timelines.len(), + result.resident_layers.len() + ); + + result + } } struct PendingDownload { diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 3f29e9f6a5..12af866810 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -15,7 +15,7 @@ use utils::sync::heavier_once_cell; use crate::config::PageServerConf; use crate::context::RequestContext; use crate::repository::Key; -use crate::tenant::{remote_timeline_client::LayerFileMetadata, RemoteTimelineClient, Timeline}; +use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline}; use super::delta_layer::{self, DeltaEntry}; use super::image_layer; @@ -204,17 +204,14 @@ impl Layer { /// /// Technically cancellation safe, but cancelling might shift the viewpoint of what generation /// of download-evict cycle on retry. - pub(crate) async fn evict_and_wait( - &self, - rtc: &RemoteTimelineClient, - ) -> Result<(), EvictionError> { - self.0.evict_and_wait(rtc).await + pub(crate) async fn evict_and_wait(&self) -> Result<(), EvictionError> { + self.0.evict_and_wait().await } /// Delete the layer file when the `self` gets dropped, also try to schedule a remote index upload /// then. /// - /// On drop, this will cause a call to [`RemoteTimelineClient::schedule_deletion_of_unlinked`]. + /// On drop, this will cause a call to [`crate::tenant::remote_timeline_client::RemoteTimelineClient::schedule_deletion_of_unlinked`]. /// This means that the unlinking by [gc] or [compaction] must have happened strictly before /// the value this is called on gets dropped. /// @@ -606,10 +603,7 @@ impl LayerInner { /// Cancellation safe, however dropping the future and calling this method again might result /// in a new attempt to evict OR join the previously started attempt. - pub(crate) async fn evict_and_wait( - &self, - _: &RemoteTimelineClient, - ) -> Result<(), EvictionError> { + pub(crate) async fn evict_and_wait(&self) -> Result<(), EvictionError> { use tokio::sync::broadcast::error::RecvError; assert!(self.have_remote_client); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 8b96c30522..52b2de843d 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -42,15 +42,6 @@ use std::{ ops::ControlFlow, }; -use crate::context::{ - AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder, -}; -use crate::tenant::storage_layer::delta_layer::DeltaEntry; -use crate::tenant::storage_layer::{ - AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer, - LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult, - ValueReconstructState, -}; use crate::tenant::tasks::BackgroundLoopKind; use crate::tenant::timeline::logical_size::CurrentLogicalSize; use crate::tenant::{ @@ -58,7 +49,22 @@ use crate::tenant::{ metadata::{save_metadata, TimelineMetadata}, par_fsync, }; +use crate::{ + context::{AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder}, + disk_usage_eviction_task::DiskUsageEvictionInfo, +}; use crate::{deletion_queue::DeletionQueueClient, tenant::remote_timeline_client::StopError}; +use crate::{ + disk_usage_eviction_task::finite_f32, + tenant::storage_layer::{ + AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer, + LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult, + ValueReconstructState, + }, +}; +use crate::{ + disk_usage_eviction_task::EvictionCandidate, tenant::storage_layer::delta_layer::DeltaEntry, +}; use crate::config::PageServerConf; use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum}; @@ -1133,12 +1139,7 @@ impl Timeline { return Ok(None); }; - let rtc = self - .remote_client - .as_ref() - .ok_or_else(|| anyhow::anyhow!("remote storage not configured; cannot evict"))?; - - match local_layer.evict_and_wait(rtc).await { + match local_layer.evict_and_wait().await { Ok(()) => Ok(Some(true)), Err(EvictionError::NotFound) => Ok(Some(false)), Err(EvictionError::Downloaded) => Ok(Some(false)), @@ -2102,7 +2103,7 @@ impl Timeline { let layer_file_names = eviction_info .resident_layers .iter() - .map(|l| l.layer.layer_desc().filename()) + .map(|l| l.layer.get_name()) .collect::>(); let decorated = match remote_client.get_layers_metadata(layer_file_names) { @@ -2120,7 +2121,7 @@ impl Timeline { .filter_map(|(layer, remote_info)| { remote_info.map(|remote_info| { HeatMapLayer::new( - layer.layer.layer_desc().filename(), + layer.layer.get_name(), IndexLayerMetadata::from(remote_info), layer.last_activity_ts, ) @@ -4423,43 +4424,6 @@ impl Timeline { } } -pub(crate) struct DiskUsageEvictionInfo { - /// Timeline's largest layer (remote or resident) - pub max_layer_size: Option, - /// Timeline's resident layers - pub resident_layers: Vec, -} - -pub(crate) struct LocalLayerInfoForDiskUsageEviction { - pub layer: Layer, - pub last_activity_ts: SystemTime, -} - -impl std::fmt::Debug for LocalLayerInfoForDiskUsageEviction { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - // format the tv_sec, tv_nsec into rfc3339 in case someone is looking at it - // having to allocate a string to this is bad, but it will rarely be formatted - let ts = chrono::DateTime::::from(self.last_activity_ts); - let ts = ts.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true); - struct DisplayIsDebug<'a, T>(&'a T); - impl<'a, T: std::fmt::Display> std::fmt::Debug for DisplayIsDebug<'a, T> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.0) - } - } - f.debug_struct("LocalLayerInfoForDiskUsageEviction") - .field("layer", &DisplayIsDebug(&self.layer)) - .field("last_activity", &ts) - .finish() - } -} - -impl LocalLayerInfoForDiskUsageEviction { - pub fn file_size(&self) -> u64 { - self.layer.layer_desc().file_size - } -} - impl Timeline { /// Returns non-remote layers for eviction. pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo { @@ -4493,9 +4457,10 @@ impl Timeline { SystemTime::now() }); - resident_layers.push(LocalLayerInfoForDiskUsageEviction { - layer: l.drop_eviction_guard(), + resident_layers.push(EvictionCandidate { + layer: l.drop_eviction_guard().into(), last_activity_ts, + relative_last_activity: finite_f32::FiniteF32::ZERO, }); } @@ -4652,11 +4617,6 @@ mod tests { .await .unwrap(); - let rtc = timeline - .remote_client - .clone() - .expect("just configured this"); - let layer = find_some_layer(&timeline).await; let layer = layer .keep_resident() @@ -4665,8 +4625,8 @@ mod tests { .expect("should had been resident") .drop_eviction_guard(); - let first = async { layer.evict_and_wait(&rtc).await }; - let second = async { layer.evict_and_wait(&rtc).await }; + let first = async { layer.evict_and_wait().await }; + let second = async { layer.evict_and_wait().await }; let (first, second) = tokio::join!(first, second); diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index ea5f5f5fa7..86e36189d2 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -215,13 +215,10 @@ impl Timeline { // So, we just need to deal with this. - let remote_client = match self.remote_client.as_ref() { - Some(c) => c, - None => { - error!("no remote storage configured, cannot evict layers"); - return ControlFlow::Continue(()); - } - }; + if self.remote_client.is_none() { + error!("no remote storage configured, cannot evict layers"); + return ControlFlow::Continue(()); + } let mut js = tokio::task::JoinSet::new(); { @@ -274,9 +271,8 @@ impl Timeline { }; let layer = guard.drop_eviction_guard(); if no_activity_for > p.threshold { - let remote_client = remote_client.clone(); // this could cause a lot of allocations in some cases - js.spawn(async move { layer.evict_and_wait(&remote_client).await }); + js.spawn(async move { layer.evict_and_wait().await }); stats.candidates += 1; } } diff --git a/test_runner/regress/test_disk_usage_eviction.py b/test_runner/regress/test_disk_usage_eviction.py index 9fdc4d59f5..0e678e7148 100644 --- a/test_runner/regress/test_disk_usage_eviction.py +++ b/test_runner/regress/test_disk_usage_eviction.py @@ -9,6 +9,7 @@ from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnv, NeonEnvBuilder, + NeonPageserver, PgBin, wait_for_last_flush_lsn, ) @@ -75,9 +76,15 @@ class EvictionOrder(str, enum.Enum): if self == EvictionOrder.ABSOLUTE_ORDER: return {"type": "AbsoluteAccessed"} elif self == EvictionOrder.RELATIVE_ORDER_EQUAL: - return {"type": "RelativeAccessed", "args": {"highest_layer_count_loses_first": False}} + return { + "type": "RelativeAccessed", + "args": {"highest_layer_count_loses_first": False}, + } elif self == EvictionOrder.RELATIVE_ORDER_SPARE: - return {"type": "RelativeAccessed", "args": {"highest_layer_count_loses_first": True}} + return { + "type": "RelativeAccessed", + "args": {"highest_layer_count_loses_first": True}, + } else: raise RuntimeError(f"not implemented: {self}") @@ -91,14 +98,24 @@ class EvictionEnv: layer_size: int pgbench_init_lsns: Dict[TenantId, Lsn] - def timelines_du(self) -> Tuple[int, int, int]: + @property + def pageserver(self): + """ + Shortcut for tests that only use one pageserver. + """ + return self.neon_env.pageserver + + def timelines_du(self, pageserver: NeonPageserver) -> Tuple[int, int, int]: return poor_mans_du( - self.neon_env, [(tid, tlid) for tid, tlid in self.timelines], verbose=False + self.neon_env, + [(tid, tlid) for tid, tlid in self.timelines], + pageserver, + verbose=False, ) - def du_by_timeline(self) -> Dict[Tuple[TenantId, TimelineId], int]: + def du_by_timeline(self, pageserver: NeonPageserver) -> Dict[Tuple[TenantId, TimelineId], int]: return { - (tid, tlid): poor_mans_du(self.neon_env, [(tid, tlid)], verbose=True)[0] + (tid, tlid): poor_mans_du(self.neon_env, [(tid, tlid)], pageserver, verbose=True)[0] for tid, tlid in self.timelines } @@ -126,7 +143,13 @@ class EvictionEnv: _avg = cur.fetchone() def pageserver_start_with_disk_usage_eviction( - self, period, max_usage_pct, min_avail_bytes, mock_behavior, eviction_order: EvictionOrder + self, + pageserver: NeonPageserver, + period, + max_usage_pct, + min_avail_bytes, + mock_behavior, + eviction_order: EvictionOrder, ): disk_usage_config = { "period": period, @@ -138,7 +161,12 @@ class EvictionEnv: enc = toml.TomlEncoder() - self.neon_env.pageserver.start( + # these can sometimes happen during startup before any tenants have been + # loaded, so nothing can be evicted, we just wait for next iteration which + # is able to evict. + pageserver.allowed_errors.append(".*WARN.* disk usage still high.*") + + pageserver.start( overrides=( "--pageserver-config-override=disk_usage_based_eviction=" + enc.dump_inline_table(disk_usage_config).replace("\n", " "), @@ -152,15 +180,10 @@ class EvictionEnv: ) def statvfs_called(): - assert self.neon_env.pageserver.log_contains(".*running mocked statvfs.*") + assert pageserver.log_contains(".*running mocked statvfs.*") wait_until(10, 1, statvfs_called) - # these can sometimes happen during startup before any tenants have been - # loaded, so nothing can be evicted, we just wait for next iteration which - # is able to evict. - self.neon_env.pageserver.allowed_errors.append(".*WARN.* disk usage still high.*") - def human_bytes(amt: float) -> str: suffixes = ["", "Ki", "Mi", "Gi"] @@ -175,23 +198,28 @@ def human_bytes(amt: float) -> str: raise RuntimeError("unreachable") -@pytest.fixture -def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> EvictionEnv: +def _eviction_env( + request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, num_pageservers: int +) -> EvictionEnv: """ Creates two tenants, one somewhat larger than the other. """ log.info(f"setting up eviction_env for test {request.node.name}") + neon_env_builder.num_pageservers = num_pageservers neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) # initial tenant will not be present on this pageserver env = neon_env_builder.init_configs() env.start() - pageserver_http = env.pageserver.http_client() + + # We will create all tenants on the 0th pageserver + pageserver_http = env.pageservers[0].http_client() # allow because we are invoking this manually; we always warn on executing disk based eviction - env.pageserver.allowed_errors.append(r".* running disk usage based eviction due to pressure.*") + for ps in env.pageservers: + ps.allowed_errors.append(r".* running disk usage based eviction due to pressure.*") # Choose small layer_size so that we can use low pgbench_scales and still get a large count of layers. # Large count of layers and small layer size is good for testing because it makes evictions predictable. @@ -216,7 +244,7 @@ def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> Ev with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint: pg_bin.run(["pgbench", "-i", f"-s{scale}", endpoint.connstr()]) - wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) + wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id, pageserver_id=1) timelines.append((tenant_id, timeline_id)) @@ -252,6 +280,20 @@ def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> Ev return eviction_env +@pytest.fixture +def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> EvictionEnv: + return _eviction_env(request, neon_env_builder, pg_bin, num_pageservers=1) + + +@pytest.fixture +def eviction_env_ha(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> EvictionEnv: + """ + Variant of the eviction environment with two pageservers for testing eviction on + HA configurations with a secondary location. + """ + return _eviction_env(request, neon_env_builder, pg_bin, num_pageservers=2) + + def test_broken_tenants_are_skipped(eviction_env: EvictionEnv): env = eviction_env @@ -264,10 +306,16 @@ def test_broken_tenants_are_skipped(eviction_env: EvictionEnv): healthy_tenant_id, healthy_timeline_id = env.timelines[1] broken_size_pre, _, _ = poor_mans_du( - env.neon_env, [(broken_tenant_id, broken_timeline_id)], verbose=True + env.neon_env, + [(broken_tenant_id, broken_timeline_id)], + env.pageserver, + verbose=True, ) healthy_size_pre, _, _ = poor_mans_du( - env.neon_env, [(healthy_tenant_id, healthy_timeline_id)], verbose=True + env.neon_env, + [(healthy_tenant_id, healthy_timeline_id)], + env.pageserver, + verbose=True, ) # try to evict everything, then validate that broken tenant wasn't touched @@ -277,10 +325,16 @@ def test_broken_tenants_are_skipped(eviction_env: EvictionEnv): log.info(f"{response}") broken_size_post, _, _ = poor_mans_du( - env.neon_env, [(broken_tenant_id, broken_timeline_id)], verbose=True + env.neon_env, + [(broken_tenant_id, broken_timeline_id)], + env.pageserver, + verbose=True, ) healthy_size_post, _, _ = poor_mans_du( - env.neon_env, [(healthy_tenant_id, healthy_timeline_id)], verbose=True + env.neon_env, + [(healthy_tenant_id, healthy_timeline_id)], + env.pageserver, + verbose=True, ) assert broken_size_pre == broken_size_post, "broken tenant should not be touched" @@ -302,7 +356,7 @@ def test_pageserver_evicts_until_pressure_is_relieved( env = eviction_env pageserver_http = env.pageserver_http - (total_on_disk, _, _) = env.timelines_du() + (total_on_disk, _, _) = env.timelines_du(env.pageserver) target = total_on_disk // 2 @@ -311,7 +365,7 @@ def test_pageserver_evicts_until_pressure_is_relieved( ) log.info(f"{response}") - (later_total_on_disk, _, _) = env.timelines_du() + (later_total_on_disk, _, _) = env.timelines_du(env.pageserver) actual_change = total_on_disk - later_total_on_disk @@ -336,8 +390,8 @@ def test_pageserver_respects_overridden_resident_size( env = eviction_env ps_http = env.pageserver_http - (total_on_disk, _, _) = env.timelines_du() - du_by_timeline = env.du_by_timeline() + (total_on_disk, _, _) = env.timelines_du(env.pageserver) + du_by_timeline = env.du_by_timeline(env.pageserver) log.info("du_by_timeline: %s", du_by_timeline) assert len(du_by_timeline) == 2, "this test assumes two tenants" @@ -379,8 +433,8 @@ def test_pageserver_respects_overridden_resident_size( GLOBAL_LRU_LOG_LINE, ), "this test is pointless if it fell back to global LRU" - (later_total_on_disk, _, _) = env.timelines_du() - later_du_by_timeline = env.du_by_timeline() + (later_total_on_disk, _, _) = env.timelines_du(env.pageserver) + later_du_by_timeline = env.du_by_timeline(env.pageserver) log.info("later_du_by_timeline: %s", later_du_by_timeline) actual_change = total_on_disk - later_total_on_disk @@ -412,7 +466,7 @@ def test_pageserver_falls_back_to_global_lru(eviction_env: EvictionEnv, order: E env = eviction_env ps_http = env.pageserver_http - (total_on_disk, _, _) = env.timelines_du() + (total_on_disk, _, _) = env.timelines_du(env.pageserver) target = total_on_disk response = ps_http.disk_usage_eviction_run( @@ -420,7 +474,7 @@ def test_pageserver_falls_back_to_global_lru(eviction_env: EvictionEnv, order: E ) log.info(f"{response}") - (later_total_on_disk, _, _) = env.timelines_du() + (later_total_on_disk, _, _) = env.timelines_du(env.pageserver) actual_change = total_on_disk - later_total_on_disk assert 0 <= actual_change, "nothing can load layers during this test" assert actual_change >= target, "eviction must always evict more than target" @@ -448,8 +502,8 @@ def test_partial_evict_tenant(eviction_env: EvictionEnv, order: EvictionOrder): env = eviction_env ps_http = env.pageserver_http - (total_on_disk, _, _) = env.timelines_du() - du_by_timeline = env.du_by_timeline() + (total_on_disk, _, _) = env.timelines_du(env.pageserver) + du_by_timeline = env.du_by_timeline(env.pageserver) # pick smaller or greater (iteration order is insertion order of scale=4 and scale=6) [warm, cold] = list(du_by_timeline.keys()) @@ -467,12 +521,12 @@ def test_partial_evict_tenant(eviction_env: EvictionEnv, order: EvictionOrder): ) log.info(f"{response}") - (later_total_on_disk, _, _) = env.timelines_du() + (later_total_on_disk, _, _) = env.timelines_du(env.pageserver) actual_change = total_on_disk - later_total_on_disk assert 0 <= actual_change, "nothing can load layers during this test" assert actual_change >= target, "eviction must always evict more than target" - later_du_by_timeline = env.du_by_timeline() + later_du_by_timeline = env.du_by_timeline(env.pageserver) for tenant, later_tenant_usage in later_du_by_timeline.items(): assert ( later_tenant_usage < du_by_timeline[tenant] @@ -508,7 +562,10 @@ def test_partial_evict_tenant(eviction_env: EvictionEnv, order: EvictionOrder): def poor_mans_du( - env: NeonEnv, timelines: list[Tuple[TenantId, TimelineId]], verbose: bool = False + env: NeonEnv, + timelines: list[Tuple[TenantId, TimelineId]], + pageserver: NeonPageserver, + verbose: bool = False, ) -> Tuple[int, int, int]: """ Disk usage, largest, smallest layer for layer files over the given (tenant, timeline) tuples; @@ -518,7 +575,7 @@ def poor_mans_du( largest_layer = 0 smallest_layer = None for tenant_id, timeline_id in timelines: - timeline_dir = env.pageserver.timeline_dir(tenant_id, timeline_id) + timeline_dir = pageserver.timeline_dir(tenant_id, timeline_id) assert timeline_dir.exists(), f"timeline dir does not exist: {timeline_dir}" total = 0 for file in timeline_dir.iterdir(): @@ -549,6 +606,7 @@ def test_statvfs_error_handling(eviction_env: EvictionEnv): env = eviction_env env.neon_env.pageserver.stop() env.pageserver_start_with_disk_usage_eviction( + env.pageserver, period="1s", max_usage_pct=90, min_avail_bytes=0, @@ -573,11 +631,12 @@ def test_statvfs_pressure_usage(eviction_env: EvictionEnv): env.neon_env.pageserver.stop() # make it seem like we're at 100% utilization by setting total bytes to the used bytes - total_size, _, _ = env.timelines_du() + total_size, _, _ = env.timelines_du(env.pageserver) blocksize = 512 total_blocks = (total_size + (blocksize - 1)) // blocksize env.pageserver_start_with_disk_usage_eviction( + env.pageserver, period="1s", max_usage_pct=33, min_avail_bytes=0, @@ -597,7 +656,7 @@ def test_statvfs_pressure_usage(eviction_env: EvictionEnv): wait_until(10, 1, relieved_log_message) - post_eviction_total_size, _, _ = env.timelines_du() + post_eviction_total_size, _, _ = env.timelines_du(env.pageserver) assert post_eviction_total_size <= 0.33 * total_size, "we requested max 33% usage" @@ -612,13 +671,14 @@ def test_statvfs_pressure_min_avail_bytes(eviction_env: EvictionEnv): env.neon_env.pageserver.stop() # make it seem like we're at 100% utilization by setting total bytes to the used bytes - total_size, _, _ = env.timelines_du() + total_size, _, _ = env.timelines_du(env.pageserver) blocksize = 512 total_blocks = (total_size + (blocksize - 1)) // blocksize min_avail_bytes = total_size // 3 env.pageserver_start_with_disk_usage_eviction( + env.pageserver, period="1s", max_usage_pct=100, min_avail_bytes=min_avail_bytes, @@ -638,7 +698,67 @@ def test_statvfs_pressure_min_avail_bytes(eviction_env: EvictionEnv): wait_until(10, 1, relieved_log_message) - post_eviction_total_size, _, _ = env.timelines_du() + post_eviction_total_size, _, _ = env.timelines_du(env.pageserver) + + assert ( + total_size - post_eviction_total_size >= min_avail_bytes + ), "we requested at least min_avail_bytes worth of free space" + + +def test_secondary_mode_eviction(eviction_env_ha: EvictionEnv): + env = eviction_env_ha + + tenant_ids = [t[0] for t in env.timelines] + + log.info("Setting up secondary location...") + ps_attached = env.neon_env.pageservers[0] + ps_secondary = env.neon_env.pageservers[1] + for tenant_id in tenant_ids: + ps_secondary.tenant_location_configure( + tenant_id, + { + "mode": "Secondary", + "secondary_conf": {"warm": True}, + "tenant_conf": {}, + }, + ) + readback_conf = ps_secondary.read_tenant_location_conf(tenant_id) + log.info(f"Read back conf: {readback_conf}") + + # Request secondary location to download all layers that the attached location has + ps_attached.http_client().tenant_heatmap_upload(tenant_id) + ps_secondary.http_client().tenant_secondary_download(tenant_id) + + # Configure the secondary pageserver to have a phony small disk size + ps_secondary.stop() + total_size, _, _ = env.timelines_du(ps_secondary) + blocksize = 512 + total_blocks = (total_size + (blocksize - 1)) // blocksize + + min_avail_bytes = total_size // 3 + + env.pageserver_start_with_disk_usage_eviction( + ps_secondary, + period="1s", + max_usage_pct=100, + min_avail_bytes=min_avail_bytes, + mock_behavior={ + "type": "Success", + "blocksize": blocksize, + "total_blocks": total_blocks, + # Only count layer files towards used bytes in the mock_statvfs. + # This avoids accounting for metadata files & tenant conf in the tests. + "name_filter": ".*__.*", + }, + eviction_order=EvictionOrder.ABSOLUTE_ORDER, + ) + + def relieved_log_message(): + assert ps_secondary.log_contains(".*disk usage pressure relieved") + + wait_until(10, 1, relieved_log_message) + + post_eviction_total_size, _, _ = env.timelines_du(ps_secondary) assert ( total_size - post_eviction_total_size >= min_avail_bytes