diff --git a/Cargo.lock b/Cargo.lock index 17aacd8ee7..ccf5fcef00 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2474,6 +2474,7 @@ dependencies = [ "strum", "strum_macros", "svg_fmt", + "sync_wrapper", "tempfile", "tenant_size_model", "thiserror", diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 3c66400a05..094069e4c0 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -363,6 +363,11 @@ impl PageServerNode { .map(|x| serde_json::from_str(x)) .transpose() .context("Failed to parse 'eviction_policy' json")?, + min_resident_size_override: settings + .remove("min_resident_size_override") + .map(|x| x.parse::()) + .transpose() + .context("Failed to parse 'min_resident_size_override' as integer")?, }; if !settings.is_empty() { bail!("Unrecognized tenant settings: {settings:?}") @@ -435,6 +440,11 @@ impl PageServerNode { .map(|x| serde_json::from_str(x)) .transpose() .context("Failed to parse 'eviction_policy' json")?, + min_resident_size_override: settings + .get("min_resident_size_override") + .map(|x| x.parse::()) + .transpose() + .context("Failed to parse 'min_resident_size_override' as an integer")?, }) .send()? .error_from_body()?; diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 7a43100ba5..6c3b30a444 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -120,6 +120,7 @@ pub struct TenantCreateRequest { // We might do that once the eviction feature has stabilizied. // For now, this field is not even documented in the openapi_spec.yml. pub eviction_policy: Option, + pub min_resident_size_override: Option, } #[serde_as] @@ -165,6 +166,7 @@ pub struct TenantConfigRequest { // We might do that once the eviction feature has stabilizied. // For now, this field is not even documented in the openapi_spec.yml. pub eviction_policy: Option, + pub min_resident_size_override: Option, } impl TenantConfigRequest { @@ -185,6 +187,7 @@ impl TenantConfigRequest { max_lsn_wal_lag: None, trace_read_requests: None, eviction_policy: None, + min_resident_size_override: None, } } } diff --git a/libs/utils/src/approx_accurate.rs b/libs/utils/src/approx_accurate.rs new file mode 100644 index 0000000000..5de3461521 --- /dev/null +++ b/libs/utils/src/approx_accurate.rs @@ -0,0 +1,51 @@ +/// Three-state `max` accumulator. +/// +/// If it accumulates over 0 or many `Some(T)` values, it is `Accurate` maximum of those values. +/// If a single `None` value is merged, it becomes `Approximate` variant. +/// +/// Remove when `Layer::file_size` is no longer an `Option`. +#[derive(Default, Debug)] +pub enum ApproxAccurate { + Approximate(T), + Accurate(T), + #[default] + Empty, +} + +impl ApproxAccurate { + /// `max(a, b)` where the approximate is inflicted receiving a `None`, or infected onwards. + #[must_use] + pub fn max(self, next: Option) -> ApproxAccurate { + use ApproxAccurate::*; + match (self, next) { + (Accurate(a) | Approximate(a), None) => Approximate(a), + (Empty, None) => Approximate(T::default()), + (Accurate(a), Some(b)) => Accurate(a.max(b)), + (Approximate(a), Some(b)) => Approximate(a.max(b)), + (Empty, Some(b)) => Accurate(b), + } + } + + pub fn is_approximate(&self) -> bool { + matches!(self, ApproxAccurate::Approximate(_)) + } + + pub fn accurate(self) -> Option { + use ApproxAccurate::*; + match self { + Accurate(a) => Some(a), + Empty => Some(T::default()), + Approximate(_) => None, + } + } + + pub fn unwrap_accurate_or(self, default: T) -> T { + use ApproxAccurate::*; + match self { + Accurate(a) => a, + Approximate(_) => default, + // Empty is still accurate, just special case for above `max` + Empty => T::default(), + } + } +} diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 766d759ab4..2c8bb6001e 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -33,6 +33,7 @@ pub mod pid_file; // Misc pub mod accum; +pub mod approx_accurate; pub mod shutdown; // Utility for binding TcpListeners with proper socket options. diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 8d6641a387..0bc7eba95e 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -48,6 +48,7 @@ serde_json = { workspace = true, features = ["raw_value"] } serde_with.workspace = true signal-hook.workspace = true svg_fmt.workspace = true +sync_wrapper.workspace = true tokio-tar.workspace = true thiserror.workspace = true tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 14e86ddcb6..f2f318adda 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -8,6 +8,7 @@ use anyhow::{anyhow, Context}; use clap::{Arg, ArgAction, Command}; use fail::FailScenario; use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp}; +use pageserver::disk_usage_eviction_task::launch_disk_usage_global_eviction_task; use remote_storage::GenericRemoteStorage; use tracing::*; @@ -319,6 +320,10 @@ fn start_pageserver( // Scan the local 'tenants/' directory and start loading the tenants BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(conf, remote_storage.clone()))?; + if let Some(remote_storage) = &remote_storage { + launch_disk_usage_global_eviction_task(conf, remote_storage.clone())?; + } + // Start up the service to handle HTTP mgmt API request. We created the // listener earlier already. { diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 39282ce320..b1c78e565a 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -27,6 +27,7 @@ use utils::{ logging::LogFormat, }; +use crate::disk_usage_eviction_task::DiskUsageEvictionTaskConfig; use crate::tenant::config::TenantConf; use crate::tenant::config::TenantConfOpt; use crate::tenant::{TENANT_ATTACHING_MARKER_FILENAME, TIMELINES_SEGMENT_NAME}; @@ -176,6 +177,8 @@ pub struct PageServerConf { // See the corresponding metric's help string. pub evictions_low_residence_duration_metric_threshold: Duration, + pub disk_usage_based_eviction: Option, + pub test_remote_failures: u64, pub ondemand_download_behavior_treat_error_as_warn: bool, @@ -248,6 +251,8 @@ struct PageServerConfigBuilder { evictions_low_residence_duration_metric_threshold: BuilderValue, + disk_usage_based_eviction: BuilderValue>, + test_remote_failures: BuilderValue, ondemand_download_behavior_treat_error_as_warn: BuilderValue, @@ -306,6 +311,8 @@ impl Default for PageServerConfigBuilder { ) .expect("cannot parse DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD")), + disk_usage_based_eviction: Set(None), + test_remote_failures: Set(0), ondemand_download_behavior_treat_error_as_warn: Set(false), @@ -425,6 +432,10 @@ impl PageServerConfigBuilder { self.evictions_low_residence_duration_metric_threshold = BuilderValue::Set(value); } + pub fn disk_usage_based_eviction(&mut self, value: Option) { + self.disk_usage_based_eviction = BuilderValue::Set(value); + } + pub fn ondemand_download_behavior_treat_error_as_warn( &mut self, ondemand_download_behavior_treat_error_as_warn: bool, @@ -503,6 +514,9 @@ impl PageServerConfigBuilder { .ok_or(anyhow!( "missing evictions_low_residence_duration_metric_threshold" ))?, + disk_usage_based_eviction: self + .disk_usage_based_eviction + .ok_or(anyhow!("missing disk_usage_based_eviction"))?, test_remote_failures: self .test_remote_failures .ok_or(anyhow!("missing test_remote_failuers"))?, @@ -693,6 +707,12 @@ impl PageServerConf { builder.synthetic_size_calculation_interval(parse_toml_duration(key, item)?), "test_remote_failures" => builder.test_remote_failures(parse_toml_u64(key, item)?), "evictions_low_residence_duration_metric_threshold" => builder.evictions_low_residence_duration_metric_threshold(parse_toml_duration(key, item)?), + "disk_usage_based_eviction" => { + tracing::info!("disk_usage_based_eviction: {:#?}", &item); + builder.disk_usage_based_eviction( + toml_edit::de::from_item(item.clone()) + .context("parse disk_usage_based_eviction")?) + }, "ondemand_download_behavior_treat_error_as_warn" => builder.ondemand_download_behavior_treat_error_as_warn(parse_toml_bool(key, item)?), _ => bail!("unrecognized pageserver option '{key}'"), } @@ -797,6 +817,13 @@ impl PageServerConf { ); } + if let Some(item) = item.get("min_resident_size_override") { + t_conf.min_resident_size_override = Some( + toml_edit::de::from_item(item.clone()) + .context("parse min_resident_size_override")?, + ); + } + Ok(t_conf) } @@ -837,6 +864,7 @@ impl PageServerConf { defaults::DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD, ) .unwrap(), + disk_usage_based_eviction: None, test_remote_failures: 0, ondemand_download_behavior_treat_error_as_warn: false, } @@ -1038,6 +1066,7 @@ log_format = 'json' evictions_low_residence_duration_metric_threshold: humantime::parse_duration( defaults::DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD )?, + disk_usage_based_eviction: None, test_remote_failures: 0, ondemand_download_behavior_treat_error_as_warn: false, }, @@ -1090,6 +1119,7 @@ log_format = 'json' metric_collection_endpoint: Some(Url::parse("http://localhost:80/metrics")?), synthetic_size_calculation_interval: Duration::from_secs(333), evictions_low_residence_duration_metric_threshold: Duration::from_secs(444), + disk_usage_based_eviction: None, test_remote_failures: 0, ondemand_download_behavior_treat_error_as_warn: false, }, diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs new file mode 100644 index 0000000000..86654134af --- /dev/null +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -0,0 +1,666 @@ +//! This module implements the pageserver-global disk-usage-based layer eviction task. +//! +//! Function `launch_disk_usage_global_eviction_task` starts a pageserver-global background +//! loop that evicts layers in response to a shortage of available bytes +//! in the $repo/tenants directory's filesystem. +//! +//! The loop runs periodically at a configurable `period`. +//! +//! Each loop iteration uses `statvfs` to determine filesystem-level space usage. +//! It compares the returned usage data against two different types of thresholds. +//! The iteration tries to evict layers until app-internal accounting says we should be below the thresholds. +//! We cross-check this internal accounting with the real world by making another `statvfs` at the end of the iteration. +//! We're good if that second statvfs shows that we're _actually_ below the configured thresholds. +//! If we're still above one or more thresholds, we emit a warning log message, leaving it to the operator to investigate further. +//! +//! There are two thresholds: +//! `max_usage_pct` is the relative available space, expressed in percent of the total filesystem space. +//! If the actual usage is higher, the threshold is exceeded. +//! `min_avail_bytes` is the absolute available space in bytes. +//! If the actual usage is lower, the threshold is exceeded. +//! +//! The iteration evicts layers in LRU fashion. +//! It tries first with a reservation of up to `tenant_min_resident_size` bytes of the most recent layers per tenant. +//! The layers not part of the per-tenant reservation are evicted least-recently-used first until we're below all thresholds. +//! If the per-tenant-reservation strategy doesn't work out, it falls back to global LRU. +use std::{ + collections::HashMap, + ops::ControlFlow, + sync::{Arc, Mutex}, + time::Duration, +}; + +use anyhow::Context; +use nix::dir::Dir; +use remote_storage::GenericRemoteStorage; +use serde::{Deserialize, Serialize}; +use sync_wrapper::SyncWrapper; +use tokio::time::Instant; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error, info, instrument, warn, Instrument}; +use utils::{approx_accurate::ApproxAccurate, id::TenantId}; + +use crate::{ + config::PageServerConf, + task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}, + tenant::{self, LocalLayerInfoForDiskUsageEviction, Timeline}, +}; + +fn deserialize_pct_0_to_100<'de, D>(deserializer: D) -> Result +where + D: serde::de::Deserializer<'de>, +{ + let v: u64 = serde::de::Deserialize::deserialize(deserializer)?; + if v > 100 { + return Err(serde::de::Error::custom( + "must be an integer between 0 and 100", + )); + } + Ok(v) +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct DiskUsageEvictionTaskConfig { + #[serde(deserialize_with = "deserialize_pct_0_to_100")] + pub max_usage_pct: u64, + pub min_avail_bytes: u64, + #[serde(with = "humantime_serde")] + pub period: Duration, +} + +pub fn launch_disk_usage_global_eviction_task( + conf: &'static PageServerConf, + storage: GenericRemoteStorage, +) -> anyhow::Result<()> { + let Some(task_config) = &conf.disk_usage_based_eviction else { + info!("disk usage based eviction task not configured"); + return Ok(()); + }; + + let tenants_dir_fd = { + let tenants_path = conf.tenants_path(); + nix::dir::Dir::open( + &tenants_path, + nix::fcntl::OFlag::O_DIRECTORY, + nix::sys::stat::Mode::empty(), + ) + .with_context(|| format!("open tenants_path {tenants_path:?}"))? + }; + + info!("launching disk usage based eviction task"); + + task_mgr::spawn( + BACKGROUND_RUNTIME.handle(), + TaskKind::DiskUsageEviction, + None, + None, + "disk usage based eviction", + false, + async move { + disk_usage_eviction_task( + task_config, + storage, + tenants_dir_fd, + task_mgr::shutdown_token(), + ) + .await; + info!("disk usage based eviction task finishing"); + Ok(()) + }, + ); + + Ok(()) +} + +#[instrument(skip_all)] +async fn disk_usage_eviction_task( + task_config: &DiskUsageEvictionTaskConfig, + storage: GenericRemoteStorage, + tenants_dir_fd: Dir, + cancel: CancellationToken, +) { + // nix::dir::Dir is Send but not Sync. + // One would think that that is sufficient, but rustc complains that the &tenants_dir_fd + // that we pass to disk_usage_eviction_iteration below will outlive the .await; + // The reason is that the &tenants_dir_fd is not sync because of stdlib-enforced axiom + // T: Sync <=> &T: Send + // The solution is to use SyncWrapper, which, by owning the tenants_dir_fd, can impl Sync. + let mut tenants_dir_fd = SyncWrapper::new(tenants_dir_fd); + + use crate::tenant::tasks::random_init_delay; + { + if random_init_delay(task_config.period, &cancel) + .await + .is_err() + { + info!("shutting down"); + return; + } + } + + let mut iteration_no = 0; + loop { + iteration_no += 1; + let start = Instant::now(); + + async { + let res = disk_usage_eviction_task_iteration( + task_config, + &storage, + &mut tenants_dir_fd, + &cancel, + ) + .await; + + match res { + Ok(()) => {} + Err(e) => { + // these stat failures are expected to be very rare + warn!("iteration failed, unexpected error: {e:#}"); + } + } + } + .instrument(tracing::info_span!("iteration", iteration_no)) + .await; + + let sleep_until = start + task_config.period; + tokio::select! { + _ = tokio::time::sleep_until(sleep_until) => {}, + _ = cancel.cancelled() => { + info!("shutting down"); + break + } + } + } +} + +pub trait Usage: Clone + Copy + std::fmt::Debug { + fn has_pressure(&self) -> bool; + fn add_available_bytes(&mut self, bytes: u64); +} + +async fn disk_usage_eviction_task_iteration( + task_config: &DiskUsageEvictionTaskConfig, + storage: &GenericRemoteStorage, + tenants_dir_fd: &mut SyncWrapper, + cancel: &CancellationToken, +) -> anyhow::Result<()> { + let usage_pre = filesystem_level_usage::get(tenants_dir_fd, task_config) + .context("get filesystem-level disk usage before evictions")?; + let res = disk_usage_eviction_task_iteration_impl(storage, usage_pre, cancel).await; + match res { + Ok(outcome) => { + debug!(?outcome, "disk_usage_eviction_iteration finished"); + match outcome { + IterationOutcome::NoPressure | IterationOutcome::Cancelled => { + // nothing to do, select statement below will handle things + } + IterationOutcome::Finished(outcome) => { + // Verify with statvfs whether we made any real progress + let after = filesystem_level_usage::get(tenants_dir_fd, task_config) + // It's quite unlikely to hit the error here. Keep the code simple and bail out. + .context("get filesystem-level disk usage after evictions")?; + + debug!(?after, "disk usage"); + + if after.has_pressure() { + // Don't bother doing an out-of-order iteration here now. + // In practice, the task period is set to a value in the tens-of-seconds range, + // which will cause another iteration to happen soon enough. + // TODO: deltas between the three different usages would be helpful, + // consider MiB, GiB, TiB + warn!(?outcome, ?after, "disk usage still high"); + } else { + info!(?outcome, ?after, "disk usage pressure relieved"); + } + } + } + } + Err(e) => { + error!("disk_usage_eviction_iteration failed: {:#}", e); + } + } + + Ok(()) +} + +#[derive(Debug, Serialize)] +#[allow(clippy::large_enum_variant)] +pub enum IterationOutcome { + NoPressure, + Cancelled, + Finished(IterationOutcomeFinished), +} + +// The `#[allow(dead_code)]` is to suppress warnings about only the Debug impl reading these fields. +// We use the Debug impl for logging, so, it's allright. +#[allow(dead_code)] +#[derive(Debug, Serialize)] +pub struct IterationOutcomeFinished { + /// The actual usage observed before we started the iteration. + before: U, + /// The expected value for `after`, according to internal accounting, after phase 1. + planned: PlannedUsage, + /// The outcome of phase 2, where we actually do the evictions. + /// + /// If all layers that phase 1 planned to evict _can_ actually get evicted, this will + /// be the same as `planned`. + assumed: AssumedUsage, +} + +// The `#[allow(dead_code)]` is to suppress warnings about only the Debug impl reading these fields. +// We use the Debug impl for logging, so, it's allright. +#[derive(Debug, Serialize)] +#[allow(dead_code)] +struct AssumedUsage { + /// The expected value for `after`, after phase 2. + projected_after: U, + /// The layers we failed to evict during phase 2. + failed: LayerCount, +} + +// The `#[allow(dead_code)]` is to suppress warnings about only the Debug impl reading these fields. +// We use the Debug impl for logging, so, it's allright. +#[allow(dead_code)] +#[derive(Debug, Serialize)] +struct PlannedUsage { + respecting_tenant_min_resident_size: U, + fallback_to_global_lru: Option, +} + +#[allow(dead_code)] +#[derive(Debug, Default, Serialize)] +struct LayerCount { + file_sizes: u64, + count: usize, +} + +#[allow(clippy::needless_late_init)] +pub async fn disk_usage_eviction_task_iteration_impl( + storage: &GenericRemoteStorage, + usage_pre: U, + cancel: &CancellationToken, +) -> anyhow::Result> { + static MUTEX: once_cell::sync::Lazy> = + once_cell::sync::Lazy::new(|| tokio::sync::Mutex::new(())); + + let _g = MUTEX + .try_lock() + .map_err(|_| anyhow::anyhow!("iteration is already executing"))?; + + // planned post-eviction usage + let mut usage_planned_min_resident_size_respecting = usage_pre; + let mut usage_planned_global_lru = None; + // achieved post-eviction usage according to internal accounting + let mut usage_assumed = usage_pre; + // actual usage read after batched evictions + + debug!(?usage_pre, "disk usage"); + + if !usage_pre.has_pressure() { + return Ok(IterationOutcome::NoPressure); + } + + warn!( + ?usage_pre, + "running disk usage based eviction due to pressure" + ); + + let mut lru_candidates: Vec<(_, LocalLayerInfoForDiskUsageEviction)> = Vec::new(); + + // get a snapshot of the list of tenants + let tenants = tenant::mgr::list_tenants() + .await + .context("get list of tenants")?; + + { + let mut tmp = Vec::new(); + for (tenant_id, _state) in &tenants { + let flow = extend_lru_candidates( + Mode::RespectTenantMinResidentSize, + *tenant_id, + &mut lru_candidates, + &mut tmp, + cancel, + ) + .await; + + if let ControlFlow::Break(()) = flow { + return Ok(IterationOutcome::Cancelled); + } + + assert!(tmp.is_empty(), "tmp has to be fully drained each iteration"); + } + } + + if cancel.is_cancelled() { + return Ok(IterationOutcome::Cancelled); + } + + // phase1: select victims to relieve pressure + lru_candidates.sort_unstable_by_key(|(_, layer)| layer.last_activity_ts); + let mut batched: HashMap<_, Vec> = HashMap::new(); + for (i, (timeline, layer)) in lru_candidates.into_iter().enumerate() { + if !usage_planned_min_resident_size_respecting.has_pressure() { + debug!( + no_candidates_evicted = i, + "took enough candidates for pressure to be relieved" + ); + break; + } + + usage_planned_min_resident_size_respecting.add_available_bytes(layer.file_size()); + + batched + .entry(TimelineKey(timeline.clone())) + .or_default() + .push(layer); + } + // If we can't relieve pressure while respecting tenant_min_resident_size, fall back to global LRU. + if usage_planned_min_resident_size_respecting.has_pressure() { + // NB: tests depend on parts of this log message + warn!(?usage_pre, ?usage_planned_min_resident_size_respecting, "tenant_min_resident_size-respecting LRU would not relieve pressure, falling back to global LRU"); + batched.clear(); + let mut usage_planned = usage_pre; + let mut global_lru_candidates = Vec::new(); + let mut tmp = Vec::new(); + for (tenant_id, _state) in &tenants { + let flow = extend_lru_candidates( + Mode::GlobalLru, + *tenant_id, + &mut global_lru_candidates, + &mut tmp, + cancel, + ) + .await; + + if let ControlFlow::Break(()) = flow { + return Ok(IterationOutcome::Cancelled); + } + + assert!(tmp.is_empty(), "tmp has to be fully drained each iteration"); + } + global_lru_candidates.sort_unstable_by_key(|(_, layer)| layer.last_activity_ts); + for (timeline, layer) in global_lru_candidates { + usage_planned.add_available_bytes(layer.file_size()); + batched + .entry(TimelineKey(timeline.clone())) + .or_default() + .push(layer); + if cancel.is_cancelled() { + return Ok(IterationOutcome::Cancelled); + } + } + usage_planned_global_lru = Some(usage_planned); + } + let usage_planned = PlannedUsage { + respecting_tenant_min_resident_size: usage_planned_min_resident_size_respecting, + fallback_to_global_lru: usage_planned_global_lru, + }; + + debug!(?usage_planned, "usage planned"); + + // phase2: evict victims batched by timeline + let mut batch = Vec::new(); + let mut evictions_failed = LayerCount::default(); + for (timeline, layers) in batched { + let tenant_id = timeline.tenant_id; + let timeline_id = timeline.timeline_id; + + batch.clear(); + batch.extend(layers.iter().map(|x| &x.layer).cloned()); + let batch_size = batch.len(); + + debug!(%timeline_id, "evicting batch for timeline"); + + async { + let results = timeline.evict_layers(storage, &batch, cancel.clone()).await; + + match results { + Err(e) => { + warn!("failed to evict batch: {:#}", e); + } + Ok(results) => { + assert_eq!(results.len(), layers.len()); + for (result, layer) in results.into_iter().zip(layers.iter()) { + match result { + Some(Ok(true)) => { + usage_assumed.add_available_bytes(layer.file_size()); + } + Some(Ok(false)) => { + // this is: + // - Replacement::{NotFound, Unexpected} + // - it cannot be is_remote_layer, filtered already + evictions_failed.file_sizes += layer.file_size(); + evictions_failed.count += 1; + } + None => { + assert!(cancel.is_cancelled()); + return; + } + Some(Err(e)) => { + // we really shouldn't be getting this, precondition failure + error!("failed to evict layer: {:#}", e); + } + } + } + } + } + } + .instrument(tracing::info_span!("evict_batch", %tenant_id, %timeline_id, batch_size)) + .await; + + if cancel.is_cancelled() { + return Ok(IterationOutcome::Cancelled); + } + } + + Ok(IterationOutcome::Finished(IterationOutcomeFinished { + before: usage_pre, + planned: usage_planned, + assumed: AssumedUsage { + projected_after: usage_assumed, + failed: evictions_failed, + }, + })) +} + +/// Different modes of gathering tenant's least recently used layers. +#[derive(Debug)] +enum Mode { + /// Add all but the most recently used `min_resident_size` worth of layers to the candidates + /// list. + /// + /// `min_resident_size` defaults to maximum layer file size of the tenant. This ensures that + /// the tenant will always have one layer resident. If we cannot compute `min_resident_size` + /// accurately because metadata is missing we use hardcoded constant. `min_resident_size` can + /// be overridden per tenant for important tenants. + RespectTenantMinResidentSize, + /// Consider all layer files from all tenants in LRU order. + /// + /// This is done if the `min_resident_size` respecting does not relieve pressure. + GlobalLru, +} + +#[instrument(skip_all, fields(?mode, %tenant_id))] +async fn extend_lru_candidates( + mode: Mode, + tenant_id: TenantId, + lru_candidates: &mut Vec<(Arc, LocalLayerInfoForDiskUsageEviction)>, + scratch: &mut Vec<(Arc, LocalLayerInfoForDiskUsageEviction)>, + cancel: &CancellationToken, +) -> ControlFlow<()> { + debug!("begin"); + + let tenant = match tenant::mgr::get_tenant(tenant_id, true).await { + Ok(tenant) => tenant, + Err(e) => { + // this can happen if tenant has lifecycle transition after we fetched it + debug!("failed to get tenant: {e:#}"); + return ControlFlow::Continue(()); + } + }; + + if cancel.is_cancelled() { + return ControlFlow::Break(()); + } + + let mut max_layer_size = ApproxAccurate::default(); + for tl in tenant.list_timelines() { + if !tl.is_active() { + continue; + } + let info = tl.get_local_layers_for_disk_usage_eviction(); + debug!(timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len()); + scratch.extend( + info.resident_layers + .into_iter() + .map(|layer_infos| (tl.clone(), layer_infos)), + ); + max_layer_size = max_layer_size.max(info.max_layer_size.accurate()); + + if cancel.is_cancelled() { + return ControlFlow::Break(()); + } + } + + let min_resident_size = match mode { + Mode::GlobalLru => { + lru_candidates.append(scratch); + return ControlFlow::Continue(()); + } + Mode::RespectTenantMinResidentSize => match tenant.get_min_resident_size_override() { + Some(size) => size, + None => { + match max_layer_size.accurate() { + Some(size) => size, + None => { + let prod_max_layer_file_size = 332_880_000; + // rate-limit warning in case above comment is wrong and we're missing `LayerMetadata` for many layers + static LAST_WARNED: Mutex> = Mutex::new(None); + let mut last_warned = LAST_WARNED.lock().unwrap(); + if last_warned + .map(|v| v.elapsed() > Duration::from_secs(60)) + .unwrap_or(true) + { + warn!(value=prod_max_layer_file_size, "some layers don't have LayerMetadata to calculate max_layer_file_size, using default value"); + *last_warned = Some(Instant::now()); + } + prod_max_layer_file_size + } + } + } + }, + }; + + scratch.sort_unstable_by_key(|(_, layer_info)| layer_info.last_activity_ts); + + let mut current: u64 = scratch.iter().map(|(_, layer)| layer.file_size()).sum(); + for (tl, layer) in scratch.drain(..) { + if cancel.is_cancelled() { + return ControlFlow::Break(()); + } + if current <= min_resident_size { + break; + } + current -= layer.file_size(); + debug!(?layer, "adding layer to lru_candidates"); + lru_candidates.push((tl, layer)); + } + + ControlFlow::Continue(()) +} + +struct TimelineKey(Arc); + +impl PartialEq for TimelineKey { + fn eq(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.0, &other.0) + } +} + +impl Eq for TimelineKey {} + +impl std::hash::Hash for TimelineKey { + fn hash(&self, state: &mut H) { + Arc::as_ptr(&self.0).hash(state); + } +} + +impl std::ops::Deref for TimelineKey { + type Target = Timeline; + + fn deref(&self) -> &Self::Target { + self.0.as_ref() + } +} + +mod filesystem_level_usage { + use anyhow::Context; + use nix::{ + dir::Dir, + sys::statvfs::{self, Statvfs}, + }; + use sync_wrapper::SyncWrapper; + + use super::DiskUsageEvictionTaskConfig; + + // The `#[allow(dead_code)]` is to suppress warnings about only the Debug impl reading these fields. + // We use the Debug impl for logging, so, it's allright. + #[derive(Debug, Clone, Copy)] + #[allow(dead_code)] + pub struct Usage<'a> { + config: &'a DiskUsageEvictionTaskConfig, + + /// Filesystem capacity + total_bytes: u64, + /// Free filesystem space + avail_bytes: u64, + } + + impl super::Usage for Usage<'_> { + fn has_pressure(&self) -> bool { + let usage_pct = + (100.0 * (1.0 - ((self.avail_bytes as f64) / (self.total_bytes as f64)))) as u64; + + let pressures = [ + ( + "min_avail_bytes", + self.avail_bytes < self.config.min_avail_bytes, + ), + ("max_usage_pct", usage_pct > self.config.max_usage_pct), + ]; + + pressures.into_iter().any(|(_, has_pressure)| has_pressure) + } + + fn add_available_bytes(&mut self, bytes: u64) { + self.avail_bytes += bytes; + } + } + + pub fn get<'a>( + tenants_dir_fd: &mut SyncWrapper, + config: &'a DiskUsageEvictionTaskConfig, + ) -> anyhow::Result> { + let stat: Statvfs = statvfs::fstatvfs(tenants_dir_fd.get_mut()) + .context("statvfs failed, presumably directory got unlinked")?; + + // https://unix.stackexchange.com/a/703650 + let blocksize = if stat.fragment_size() > 0 { + stat.fragment_size() + } else { + stat.block_size() + }; + + // use blocks_available (b_avail) since, pageserver runs as unprivileged user + let avail_bytes = stat.blocks_available() * blocksize; + let total_bytes = stat.blocks() * blocksize; + + Ok(Usage { + config, + total_bytes, + avail_bytes, + }) + } +} diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 3d3a9892bf..1da15a8951 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -27,6 +27,31 @@ paths: id: type: integer + /v1/disk_usage_eviction/run: + put: + description: Do an iteration of disk-usage-based eviction to evict a given amount of disk space. + security: [] + requestBody: + content: + application/json: + schema: + type: object + required: + - wanted_trimmed_bytes + properties: + wanted_trimmed_bytes: + type: integer + responses: + "200": + description: | + The run completed. + This does not necessarily mean that we actually evicted `wanted_trimmed_bytes`. + Examine the returned object for detail, or, just watch the actual effect of the call using `du` or `df`. + content: + application/json: + schema: + type: object + /v1/tenant/{tenant_id}: parameters: - name: tenant_id diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 39f2776952..cad9a2efaf 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -746,6 +746,8 @@ async fn tenant_create_handler(mut request: Request) -> Result) -> Result, ApiError> { + use std::str::FromStr; + let tenant_id = get_request_param(&r, "tenant_id")?; + let tenant_id = TenantId::from_str(tenant_id).map_err(|e| ApiError::BadRequest(e.into()))?; + + let tenant = crate::tenant::mgr::get_tenant(tenant_id, true) + .await + .map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?; + + tenant.set_broken("broken from test"); + + json_response(StatusCode::OK, ()) +} + +async fn disk_usage_eviction_run(mut r: Request) -> Result, ApiError> { + check_permission(&r, None)?; + + #[derive(serde::Deserialize)] + struct Config { + /// How much to trim at minimum + wanted_trimmed_bytes: u64, + } + + #[derive(Debug, Clone, Copy, serde::Serialize)] + struct Usage { + wanted_trimmed_bytes: u64, + freed_bytes: u64, + } + + impl crate::disk_usage_eviction_task::Usage for Usage { + fn has_pressure(&self) -> bool { + self.wanted_trimmed_bytes > self.freed_bytes + } + + fn add_available_bytes(&mut self, bytes: u64) { + self.freed_bytes += bytes; + } + } + + let config = json_request::(&mut r) + .await + .map_err(|_| ApiError::BadRequest(anyhow::anyhow!("invalid JSON body")))?; + + let usage = Usage { + wanted_trimmed_bytes: config.wanted_trimmed_bytes, + freed_bytes: 0, + }; + + use crate::task_mgr::MGMT_REQUEST_RUNTIME; + + let (tx, rx) = tokio::sync::oneshot::channel(); + + let state = get_state(&r); + + let Some(storage) = state.remote_storage.clone() else { + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "remote storage not configured, cannot run eviction iteration" + ))) + }; + + let cancel = CancellationToken::new(); + let child_cancel = cancel.clone(); + let _g = cancel.drop_guard(); + + crate::task_mgr::spawn( + MGMT_REQUEST_RUNTIME.handle(), + TaskKind::DiskUsageEviction, + None, + None, + "ondemand disk usage eviction", + false, + async move { + let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl( + &storage, + usage, + &child_cancel, + ) + .await; + + info!(?res, "disk_usage_eviction_task_iteration_impl finished"); + + let _ = tx.send(res); + Ok(()) + } + .in_current_span(), + ); + + let response = rx.await.unwrap().map_err(ApiError::InternalServerError)?; + + json_response(StatusCode::OK, response) +} diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 09e21ae755..4c64ef371a 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -4,6 +4,7 @@ pub mod broker_client; pub mod config; pub mod consumption_metrics; pub mod context; +pub mod disk_usage_eviction_task; pub mod http; pub mod import_datadir; pub mod keyspace; diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 2734031a09..5edefcac82 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -234,6 +234,9 @@ pub enum TaskKind { // Eviction. One per timeline. Eviction, + /// See [`crate::disk_usage_eviction_task`]. + DiskUsageEviction, + // Initial logical size calculation InitialLogicalSizeCalculation, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 5f1e23b873..6fabfdca71 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -94,7 +94,7 @@ mod timeline; pub mod size; -pub use timeline::{PageReconstructError, Timeline}; +pub use timeline::{LocalLayerInfoForDiskUsageEviction, PageReconstructError, Timeline}; // re-export this function so that page_cache.rs can use it. pub use crate::tenant::ephemeral_file::writeback as writeback_ephemeral_file; @@ -1693,6 +1693,13 @@ impl Tenant { .unwrap_or(self.conf.default_tenant_conf.trace_read_requests) } + pub fn get_min_resident_size_override(&self) -> Option { + let tenant_conf = self.tenant_conf.read().unwrap(); + tenant_conf + .min_resident_size_override + .or(self.conf.default_tenant_conf.min_resident_size_override) + } + pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) { *self.tenant_conf.write().unwrap() = new_tenant_conf; } @@ -2762,6 +2769,7 @@ pub mod harness { max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag), trace_read_requests: Some(tenant_conf.trace_read_requests), eviction_policy: Some(tenant_conf.eviction_policy), + min_resident_size_override: tenant_conf.min_resident_size_override, } } } diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index 48cb6be121..cdabb23a7b 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -92,6 +92,7 @@ pub struct TenantConf { pub max_lsn_wal_lag: NonZeroU64, pub trace_read_requests: bool, pub eviction_policy: EvictionPolicy, + pub min_resident_size_override: Option, } /// Same as TenantConf, but this struct preserves the information about @@ -159,6 +160,10 @@ pub struct TenantConfOpt { #[serde(skip_serializing_if = "Option::is_none")] #[serde(default)] pub eviction_policy: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub min_resident_size_override: Option, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] @@ -220,6 +225,9 @@ impl TenantConfOpt { .trace_read_requests .unwrap_or(global_conf.trace_read_requests), eviction_policy: self.eviction_policy.unwrap_or(global_conf.eviction_policy), + min_resident_size_override: self + .min_resident_size_override + .or(global_conf.min_resident_size_override), } } } @@ -251,6 +259,7 @@ impl Default for TenantConf { .expect("cannot parse default max walreceiver Lsn wal lag"), trace_read_requests: false, eviction_policy: EvictionPolicy::NoEviction, + min_resident_size_override: None, } } } diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 52ce2cab42..659f3ed773 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -121,10 +121,10 @@ struct LayerAccessStatsInner { } #[derive(Debug, Clone, Copy)] -pub(super) struct LayerAccessStatFullDetails { - pub(super) when: SystemTime, - pub(super) task_kind: TaskKind, - pub(super) access_kind: LayerAccessKind, +pub(crate) struct LayerAccessStatFullDetails { + pub(crate) when: SystemTime, + pub(crate) task_kind: TaskKind, + pub(crate) access_kind: LayerAccessKind, } #[derive(Clone, Copy, strum_macros::EnumString)] @@ -255,7 +255,7 @@ impl LayerAccessStats { ret } - pub(super) fn most_recent_access_or_residence_event( + fn most_recent_access_or_residence_event( &self, ) -> Either { let locked = self.0.lock().unwrap(); @@ -268,6 +268,13 @@ impl LayerAccessStats { } } } + + pub(crate) fn latest_activity(&self) -> SystemTime { + match self.most_recent_access_or_residence_event() { + Either::Left(mra) => mra.when, + Either::Right(re) => re.timestamp, + } + } } /// Supertrait of the [`Layer`] trait that captures the bare minimum interface diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index f5dbe63b0b..509dff4d41 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -13,6 +13,7 @@ use pageserver_api::models::{ DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceStatus, TimelineState, }; +use remote_storage::GenericRemoteStorage; use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError}; use tokio_util::sync::CancellationToken; use tracing::*; @@ -55,6 +56,7 @@ use pageserver_api::reltag::RelTag; use postgres_connection::PgConnectionConfig; use postgres_ffi::to_pg_timestamp; use utils::{ + approx_accurate::ApproxAccurate, id::{TenantId, TimelineId}, lsn::{AtomicLsn, Lsn, RecordLsn}, seqwait::SeqWait, @@ -974,6 +976,25 @@ impl Timeline { } } + /// Evict a batch of layers. + /// + /// GenericRemoteStorage reference is required as a witness[^witness_article] for "remote storage is configured." + /// + /// [^witness_article]: https://willcrichton.net/rust-api-type-patterns/witnesses.html + pub async fn evict_layers( + &self, + _: &GenericRemoteStorage, + layers_to_evict: &[Arc], + cancel: CancellationToken, + ) -> anyhow::Result>>> { + let remote_client = self.remote_client.clone().expect( + "GenericRemoteStorage is configured, so timeline must have RemoteTimelineClient", + ); + + self.evict_layer_batch(&remote_client, layers_to_evict, cancel) + .await + } + /// Evict multiple layers at once, continuing through errors. /// /// Try to evict the given `layers_to_evict` by @@ -1011,6 +1032,15 @@ impl Timeline { // now lock out layer removal (compaction, gc, timeline deletion) let layer_removal_guard = self.layer_removal_cs.lock().await; + { + // to avoid racing with detach and delete_timeline + let state = self.current_state(); + anyhow::ensure!( + state == TimelineState::Active, + "timeline is not active but {state:?}" + ); + } + // start the batch update let mut layer_map = self.layers.write().unwrap(); let mut batch_updates = layer_map.batch_update(); @@ -1044,6 +1074,8 @@ impl Timeline { use super::layer_map::Replacement; if local_layer.is_remote_layer() { + // TODO: consider returning an err here instead of false, which is the same out the + // match later return Ok(false); } @@ -1136,6 +1168,8 @@ impl Timeline { } }; + // TODO: update metrics for how + Ok(replaced) } } @@ -4014,6 +4048,68 @@ impl Timeline { } } +pub struct DiskUsageEvictionInfo { + /// Timeline's largest layer (remote or resident) + pub max_layer_size: ApproxAccurate, + /// Timeline's resident layers + pub resident_layers: Vec, +} + +pub struct LocalLayerInfoForDiskUsageEviction { + pub layer: Arc, + pub last_activity_ts: SystemTime, +} + +impl std::fmt::Debug for LocalLayerInfoForDiskUsageEviction { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // format the tv_sec, tv_nsec into rfc3339 in case someone is looking at it + // having to allocate a string to this is bad, but it will rarely be formatted + let ts = chrono::DateTime::::from(self.last_activity_ts); + let ts = ts.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true); + f.debug_struct("LocalLayerInfoForDiskUsageEviction") + .field("layer", &self.layer) + .field("last_activity", &ts) + .finish() + } +} + +impl LocalLayerInfoForDiskUsageEviction { + pub fn file_size(&self) -> u64 { + self.layer + .file_size() + .expect("we know this is a local layer") + } +} + +impl Timeline { + pub(crate) fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo { + let layers = self.layers.read().unwrap(); + + let mut max_layer_size = ApproxAccurate::default(); + let mut resident_layers = Vec::new(); + + for l in layers.iter_historic_layers() { + max_layer_size = max_layer_size.max(l.file_size()); + + if l.is_remote_layer() { + continue; + } + + let last_activity_ts = l.access_stats().latest_activity(); + + resident_layers.push(LocalLayerInfoForDiskUsageEviction { + layer: l, + last_activity_ts, + }); + } + + DiskUsageEvictionInfo { + max_layer_size, + resident_layers, + } + } +} + type TraversalPathItem = ( ValueReconstructResult, Lsn, diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 2aad0ef0f3..41aae8dba0 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -6,7 +6,6 @@ use std::{ time::{Duration, SystemTime}, }; -use either::Either; use tokio::time::Instant; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, instrument, warn}; @@ -126,13 +125,7 @@ impl Timeline { if hist_layer.is_remote_layer() { continue; } - let last_activity_ts = match hist_layer - .access_stats() - .most_recent_access_or_residence_event() - { - Either::Left(mra) => mra.when, - Either::Right(re) => re.timestamp, - }; + let last_activity_ts = hist_layer.access_stats().latest_activity(); let no_activity_for = match now.duration_since(last_activity_ts) { Ok(d) => d, Err(_e) => { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 6429b1e940..db80057c71 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1214,6 +1214,14 @@ class PageserverHttpClient(requests.Session): self.verbose_error(res) return TenantConfig.from_json(res.json()) + def set_tenant_config(self, tenant_id: TenantId, config: dict[str, Any]): + assert "tenant_id" not in config.keys() + res = self.put( + f"http://localhost:{self.port}/v1/tenant/config", + json={**config, "tenant_id": str(tenant_id)}, + ) + self.verbose_error(res) + def tenant_size(self, tenant_id: TenantId) -> int: return self.tenant_size_and_modelinputs(tenant_id)[0] @@ -1530,6 +1538,14 @@ class PageserverHttpClient(requests.Session): for layer in info.historic_layers: self.evict_layer(tenant_id, timeline_id, layer.layer_file_name) + def disk_usage_eviction_run(self, request: dict[str, Any]): + res = self.put( + f"http://localhost:{self.port}/v1/disk_usage_eviction/run", + json=request, + ) + self.verbose_error(res) + return res.json() + @dataclass class TenantConfig: diff --git a/test_runner/regress/test_disk_usage_eviction.py b/test_runner/regress/test_disk_usage_eviction.py new file mode 100644 index 0000000000..68efbd217b --- /dev/null +++ b/test_runner/regress/test_disk_usage_eviction.py @@ -0,0 +1,330 @@ +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Iterator, Tuple + +import pytest +from fixtures.log_helper import log +from fixtures.neon_fixtures import ( + LayerMapInfo, + NeonEnv, + NeonEnvBuilder, + PageserverHttpClient, + PgBin, + RemoteStorageKind, + wait_for_last_flush_lsn, +) +from fixtures.types import TenantId, TimelineId + + +@pytest.mark.parametrize("config_level_override", [None, 400]) +def test_min_resident_size_override_handling( + neon_env_builder: NeonEnvBuilder, config_level_override: int +): + env = neon_env_builder.init_start() + ps_http = env.pageserver.http_client() + + def assert_config(tenant_id, expect_override, expect_effective): + config = ps_http.tenant_config(tenant_id) + assert config.tenant_specific_overrides.get("min_resident_size_override") == expect_override + assert config.effective_config.get("min_resident_size_override") == expect_effective + + def assert_overrides(tenant_id, default_tenant_conf_value): + ps_http.set_tenant_config(tenant_id, {"min_resident_size_override": 200}) + assert_config(tenant_id, 200, 200) + + ps_http.set_tenant_config(tenant_id, {"min_resident_size_override": 0}) + assert_config(tenant_id, 0, 0) + + ps_http.set_tenant_config(tenant_id, {}) + assert_config(tenant_id, None, default_tenant_conf_value) + + env.pageserver.stop() + if config_level_override is not None: + env.pageserver.start( + overrides=( + "--pageserver-config-override=tenant_config={ min_resident_size_override = " + + str(config_level_override) + + " }", + ) + ) + else: + env.pageserver.start() + + tenant_id, _ = env.neon_cli.create_tenant() + assert_overrides(tenant_id, config_level_override) + + # Also ensure that specifying the paramter to create_tenant works, in addition to http-level recconfig. + tenant_id, _ = env.neon_cli.create_tenant(conf={"min_resident_size_override": "100"}) + assert_config(tenant_id, 100, 100) + ps_http.set_tenant_config(tenant_id, {}) + assert_config(tenant_id, None, config_level_override) + + +@dataclass +class EvictionEnv: + timelines: list[Tuple[TenantId, TimelineId, LayerMapInfo]] + neon_env: NeonEnv + pg_bin: PgBin + pageserver_http: PageserverHttpClient + layer_size: int + + def timelines_du(self) -> Tuple[int, int, int]: + return poor_mans_du(self.neon_env, [(tid, tlid) for tid, tlid, _ in self.timelines]) + + def du_by_timeline(self) -> dict[Tuple[TenantId, TimelineId], int]: + return { + (tid, tlid): poor_mans_du(self.neon_env, [(tid, tlid)])[0] + for tid, tlid, _ in self.timelines + } + + +@pytest.fixture +def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> Iterator[EvictionEnv]: + """ + Creates two tenants, one somewhat larger than the other. + """ + + log.info(f"setting up eviction_env for test {request.node.name}") + + neon_env_builder.enable_remote_storage(RemoteStorageKind.LOCAL_FS, f"{request.node.name}") + + env = neon_env_builder.init_start() + pageserver_http = env.pageserver.http_client() + + # allow because we are invoking this manually; we always warn on executing disk based eviction + env.pageserver.allowed_errors.append(r".* running disk usage based eviction due to pressure.*") + env.pageserver.allowed_errors.append( + r".* Changing Active tenant to Broken state, reason: broken from test" + ) + + # break the difficult to use initial default tenant, later assert that it has not been evicted + broken_tenant_id, broken_timeline_id = (env.initial_tenant, env.initial_timeline) + assert broken_timeline_id is not None + res = pageserver_http.put( + f"http://localhost:{pageserver_http.port}/v1/tenant/{env.initial_tenant}/break" + ) + pageserver_http.verbose_error(res) + (broken_on_disk_before, _, _) = poor_mans_du( + env, timelines=[(broken_tenant_id, broken_timeline_id)] + ) + + timelines = [] + + # Choose small layer_size so that we can use low pgbench_scales and still get a large count of layers. + # Large count of layers and small layer size is good for testing because it makes evictions predictable. + # Predictable in the sense that many layer evictions will be required to reach the eviction target, because + # each eviction only makes small progress. That means little overshoot, and thereby stable asserts. + pgbench_scales = [4, 6] + layer_size = 5 * 1024**2 + + for scale in pgbench_scales: + tenant_id, timeline_id = env.neon_cli.create_tenant( + conf={ + "gc_period": "0s", + "compaction_period": "0s", + "checkpoint_distance": f"{layer_size}", + "image_creation_threshold": "100", + "compaction_target_size": f"{layer_size}", + } + ) + + with env.postgres.create_start("main", tenant_id=tenant_id) as pg: + pg_bin.run(["pgbench", "-i", f"-s{scale}", pg.connstr()]) + wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id) + + pageserver_http.timeline_checkpoint(tenant_id, timeline_id) + layers = pageserver_http.layer_map_info(tenant_id, timeline_id) + log.info(f"{layers}") + assert len(layers.historic_layers) >= 4 + + timelines.append((tenant_id, timeline_id, layers)) + + eviction_env = EvictionEnv( + timelines=timelines, neon_env=env, pageserver_http=pageserver_http, layer_size=layer_size, pg_bin=pg_bin + ) + + yield eviction_env + + (broken_on_disk_after, _, _) = poor_mans_du( + eviction_env.neon_env, [(broken_tenant_id, broken_timeline_id)] + ) + + assert ( + broken_on_disk_before == broken_on_disk_after + ), "only touch active tenants with disk_usage_eviction" + + +def test_pageserver_evicts_until_pressure_is_relieved(eviction_env: EvictionEnv): + """ + Basic test to ensure that we evict enough to relieve pressure. + """ + env = eviction_env + pageserver_http = env.pageserver_http + + (total_on_disk, _, _) = env.timelines_du() + + target = total_on_disk // 2 + + response = pageserver_http.disk_usage_eviction_run({"wanted_trimmed_bytes": target}) + log.info(f"{response}") + + (later_total_on_disk, _, _) = env.timelines_du() + + actual_change = total_on_disk - later_total_on_disk + + assert 0 <= actual_change, "nothing can load layers during this test" + assert actual_change >= target, "must evict more than half" + assert ( + response["Finished"]["assumed"]["projected_after"]["freed_bytes"] >= actual_change + ), "report accurately evicted bytes" + assert response["Finished"]["assumed"]["failed"]["count"] == 0, "zero failures expected" + + +def test_pageserver_respects_overridden_resident_size(eviction_env: EvictionEnv): + """ + Override tenant min resident and ensure that it will be respected by eviction. + """ + env = eviction_env + ps_http = env.pageserver_http + + (total_on_disk, _, _) = env.timelines_du() + du_by_timeline = env.du_by_timeline() + log.info("du_by_timeline: %s", du_by_timeline) + + assert len(du_by_timeline) == 2, "this test assumes two tenants" + large_tenant = max(du_by_timeline, key=du_by_timeline.__getitem__) + small_tenant = min(du_by_timeline, key=du_by_timeline.__getitem__) + assert du_by_timeline[large_tenant] > du_by_timeline[small_tenant] + assert ( + du_by_timeline[large_tenant] - du_by_timeline[small_tenant] > 5 * env.layer_size + ), "ensure this test will do more than 1 eviction" + + # give the larger tenant a haircut while prevening the smaller tenant from getting one + min_resident_size = du_by_timeline[small_tenant] + target = du_by_timeline[large_tenant] - du_by_timeline[small_tenant] + assert any( + [du > min_resident_size for du in du_by_timeline.values()] + ), "ensure the larger tenant will get a haircut" + + ps_http.set_tenant_config(small_tenant[0], {"min_resident_size_override": min_resident_size}) + ps_http.set_tenant_config(large_tenant[0], {"min_resident_size_override": min_resident_size}) + + # do one run + response = ps_http.disk_usage_eviction_run({"wanted_trimmed_bytes": target}) + log.info(f"{response}") + + time.sleep(1) # give log time to flush + assert not env.neon_env.pageserver.log_contains( + "falling back to global LRU" + ), "this test is pointless if it fell back to global LRU" + + (later_total_on_disk, _, _) = env.timelines_du() + later_du_by_timeline = env.du_by_timeline() + log.info("later_du_by_timeline: %s", later_du_by_timeline) + + actual_change = total_on_disk - later_total_on_disk + assert 0 <= actual_change, "nothing can load layers during this test" + assert actual_change >= target, "eviction must always evict more than target" + assert ( + response["Finished"]["assumed"]["projected_after"]["freed_bytes"] >= actual_change + ), "report accurately evicted bytes" + assert response["Finished"]["assumed"]["failed"]["count"] == 0, "zero failures expected" + + assert ( + later_du_by_timeline[small_tenant] == du_by_timeline[small_tenant] + ), "small tenant sees no haircut" + assert ( + later_du_by_timeline[large_tenant] < du_by_timeline[large_tenant] + ), "large tenant gets a haircut" + assert du_by_timeline[large_tenant] - later_du_by_timeline[large_tenant] >= target + + +def test_pageserver_falls_back_to_global_lru(eviction_env: EvictionEnv): + """ + The pageserver should fall back to global LRU if the tenant_min_resident_size-respecting eviction + wouldn't evict enough. + """ + env = eviction_env + ps_http = env.pageserver_http + + (total_on_disk, _, _) = env.timelines_du() + target = total_on_disk + + response = ps_http.disk_usage_eviction_run({"wanted_trimmed_bytes": target}) + log.info(f"{response}") + + (later_total_on_disk, _, _) = env.timelines_du() + actual_change = total_on_disk - later_total_on_disk + assert 0 <= actual_change, "nothing can load layers during this test" + assert actual_change >= target, "eviction must always evict more than target" + + time.sleep(1) # give log time to flush + assert env.neon_env.pageserver.log_contains("falling back to global LRU") + env.neon_env.pageserver.allowed_errors.append(".*falling back to global LRU") + +def test_partial_evict_tenant(eviction_env: EvictionEnv): + + env = eviction_env + ps_http = env.pageserver_http + + (total_on_disk, _, _) = env.timelines_du() + du_by_timeline = env.du_by_timeline() + + # pick any tenant + [our_tenant, other_tenant] = list(du_by_timeline.keys()) + (tenant_id, timeline_id) = our_tenant + tenant_usage = du_by_timeline[our_tenant] + + # make our tenant more recently used than the other one + with env.neon_env.postgres.create_start("main", tenant_id=tenant_id) as pg: + env.pg_bin.run(["pgbench", "-S" , pg.connstr()]) + + target = total_on_disk - (tenant_usage//2) + response = ps_http.disk_usage_eviction_run({"wanted_trimmed_bytes": target}) + log.info(f"{response}") + + (later_total_on_disk, _, _) = env.timelines_du() + actual_change = total_on_disk - later_total_on_disk + assert 0 <= actual_change, "nothing can load layers during this test" + assert actual_change >= target, "eviction must always evict more than target" + + later_du_by_timeline = env.du_by_timeline() + for tenant, later_tenant_usage in later_du_by_timeline.items(): + assert later_tenant_usage < du_by_timeline[tenant], "all tenants should have lost some layers" + + assert later_du_by_timeline[our_tenant] > 0.4 * tenant_usage, "our warmed up tenant should be at about half capacity" + assert later_du_by_timeline[other_tenant] < 2 * env.layer_size, "the other tenant should be completely evicted" + + +def poor_mans_du( + env: NeonEnv, timelines: list[Tuple[TenantId, TimelineId]] +) -> Tuple[int, int, int]: + """ + Disk usage, largest, smallest layer for layer files over the given (tenant, timeline) tuples; + this could be done over layers endpoint just as well. + """ + total_on_disk = 0 + largest_layer = 0 + smallest_layer = None + for tenant_id, timeline_id in timelines: + dir = Path(env.repo_dir) / "tenants" / str(tenant_id) / "timelines" / str(timeline_id) + assert dir.exists(), f"timeline dir does not exist: {dir}" + sum = 0 + for file in dir.iterdir(): + if "__" not in file.name: + continue + size = file.stat().st_size + sum += size + largest_layer = max(largest_layer, size) + if smallest_layer: + smallest_layer = min(smallest_layer, size) + else: + smallest_layer = size + log.info(f"{tenant_id}/{timeline_id} => {file.name} {size}") + + log.info(f"{tenant_id}/{timeline_id}: sum {sum}") + total_on_disk += sum + + assert smallest_layer is not None or total_on_disk == 0 and largest_layer == 0 + return (total_on_disk, largest_layer, smallest_layer or 0)