diff --git a/.github/ansible/staging.eu-west-1.hosts.yaml b/.github/ansible/staging.eu-west-1.hosts.yaml index b537795704..e8d0bb1dc7 100644 --- a/.github/ansible/staging.eu-west-1.hosts.yaml +++ b/.github/ansible/staging.eu-west-1.hosts.yaml @@ -8,6 +8,14 @@ storage: pg_distrib_dir: /usr/local metric_collection_endpoint: http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events metric_collection_interval: 10min + disk_usage_based_eviction: + max_usage_pct: 80 + # TODO: learn typical resident-size growth rate [GiB/minute] and configure + # min_avail_bytes such that we have X minutes of headroom. + min_avail_bytes: 0 + # We assume that the worst-case growth rate is small enough that we can + # catch above-threshold conditions by checking every 10s. + period: "10s" tenant_config: eviction_policy: kind: "LayerAccessThreshold" diff --git a/.github/ansible/staging.us-east-2.hosts.yaml b/.github/ansible/staging.us-east-2.hosts.yaml index cd8f832af0..4ef51651fc 100644 --- a/.github/ansible/staging.us-east-2.hosts.yaml +++ b/.github/ansible/staging.us-east-2.hosts.yaml @@ -8,6 +8,14 @@ storage: pg_distrib_dir: /usr/local metric_collection_endpoint: http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events metric_collection_interval: 10min + disk_usage_based_eviction: + max_usage_pct: 80 + # TODO: learn typical resident-size growth rate [GiB/minute] and configure + # min_avail_bytes such that we have X minutes of headroom. + min_avail_bytes: 0 + # We assume that the worst-case growth rate is small enough that we can + # catch above-threshold conditions by checking every 10s. + period: "10s" tenant_config: eviction_policy: kind: "LayerAccessThreshold" diff --git a/Cargo.lock b/Cargo.lock index a19a97a40d..4590e76014 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2474,6 +2474,7 @@ dependencies = [ "strum", "strum_macros", "svg_fmt", + "sync_wrapper", "tempfile", "tenant_size_model", "thiserror", @@ -4556,6 +4557,7 @@ dependencies = [ "once_cell", "pin-project-lite", "rand", + "regex", "routerify", "sentry", "serde", diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 3c66400a05..094069e4c0 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -363,6 +363,11 @@ impl PageServerNode { .map(|x| serde_json::from_str(x)) .transpose() .context("Failed to parse 'eviction_policy' json")?, + min_resident_size_override: settings + .remove("min_resident_size_override") + .map(|x| x.parse::()) + .transpose() + .context("Failed to parse 'min_resident_size_override' as integer")?, }; if !settings.is_empty() { bail!("Unrecognized tenant settings: {settings:?}") @@ -435,6 +440,11 @@ impl PageServerNode { .map(|x| serde_json::from_str(x)) .transpose() .context("Failed to parse 'eviction_policy' json")?, + min_resident_size_override: settings + .get("min_resident_size_override") + .map(|x| x.parse::()) + .transpose() + .context("Failed to parse 'min_resident_size_override' as an integer")?, }) .send()? .error_from_body()?; diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 0f860d0a6d..98a4b56858 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -120,6 +120,7 @@ pub struct TenantCreateRequest { // We might do that once the eviction feature has stabilizied. // For now, this field is not even documented in the openapi_spec.yml. pub eviction_policy: Option, + pub min_resident_size_override: Option, } #[serde_as] @@ -165,6 +166,7 @@ pub struct TenantConfigRequest { // We might do that once the eviction feature has stabilizied. // For now, this field is not even documented in the openapi_spec.yml. pub eviction_policy: Option, + pub min_resident_size_override: Option, } impl TenantConfigRequest { @@ -185,6 +187,7 @@ impl TenantConfigRequest { max_lsn_wal_lag: None, trace_read_requests: None, eviction_policy: None, + min_resident_size_override: None, } } } diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index b9f67e82f8..391bc52a80 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -19,6 +19,7 @@ jsonwebtoken.workspace = true nix.workspace = true once_cell.workspace = true pin-project-lite.workspace = true +regex.workspace = true routerify.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 766d759ab4..d4176911ac 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -51,6 +51,9 @@ pub mod history_buffer; pub mod measured_stream; +pub mod serde_percent; +pub mod serde_regex; + /// use with fail::cfg("$name", "return(2000)") #[macro_export] macro_rules! failpoint_sleep_millis_async { diff --git a/libs/utils/src/serde_percent.rs b/libs/utils/src/serde_percent.rs new file mode 100644 index 0000000000..63b62b5f1e --- /dev/null +++ b/libs/utils/src/serde_percent.rs @@ -0,0 +1,83 @@ +//! A serde::Deserialize type for percentages. +//! +//! See [`Percent`] for details. + +use serde::{Deserialize, Serialize}; + +/// If the value is not an integer between 0 and 100, +/// deserialization fails with a descriptive error. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +#[serde(transparent)] +pub struct Percent(#[serde(deserialize_with = "deserialize_pct_0_to_100")] u8); + +impl Percent { + pub fn get(&self) -> u8 { + self.0 + } +} + +fn deserialize_pct_0_to_100<'de, D>(deserializer: D) -> Result +where + D: serde::de::Deserializer<'de>, +{ + let v: u8 = serde::de::Deserialize::deserialize(deserializer)?; + if v > 100 { + return Err(serde::de::Error::custom( + "must be an integer between 0 and 100", + )); + } + Ok(v) +} + +#[cfg(test)] +mod tests { + use super::Percent; + + #[derive(serde::Deserialize, serde::Serialize, Debug, PartialEq, Eq)] + struct Foo { + bar: Percent, + } + + #[test] + fn basics() { + let input = r#"{ "bar": 50 }"#; + let foo: Foo = serde_json::from_str(input).unwrap(); + assert_eq!(foo.bar.get(), 50); + } + #[test] + fn null_handling() { + let input = r#"{ "bar": null }"#; + let res: Result = serde_json::from_str(input); + assert!(res.is_err()); + } + #[test] + fn zero() { + let input = r#"{ "bar": 0 }"#; + let foo: Foo = serde_json::from_str(input).unwrap(); + assert_eq!(foo.bar.get(), 0); + } + #[test] + fn out_of_range_above() { + let input = r#"{ "bar": 101 }"#; + let res: Result = serde_json::from_str(input); + assert!(res.is_err()); + } + #[test] + fn out_of_range_below() { + let input = r#"{ "bar": -1 }"#; + let res: Result = serde_json::from_str(input); + assert!(res.is_err()); + } + #[test] + fn float() { + let input = r#"{ "bar": 50.5 }"#; + let res: Result = serde_json::from_str(input); + assert!(res.is_err()); + } + #[test] + fn string() { + let input = r#"{ "bar": "50 %" }"#; + let res: Result = serde_json::from_str(input); + assert!(res.is_err()); + } +} diff --git a/libs/utils/src/serde_regex.rs b/libs/utils/src/serde_regex.rs new file mode 100644 index 0000000000..95ea4f8e44 --- /dev/null +++ b/libs/utils/src/serde_regex.rs @@ -0,0 +1,60 @@ +//! A `serde::{Deserialize,Serialize}` type for regexes. + +use std::ops::Deref; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[serde(transparent)] +pub struct Regex( + #[serde( + deserialize_with = "deserialize_regex", + serialize_with = "serialize_regex" + )] + regex::Regex, +); + +fn deserialize_regex<'de, D>(deserializer: D) -> Result +where + D: serde::de::Deserializer<'de>, +{ + let s: String = serde::de::Deserialize::deserialize(deserializer)?; + let re = regex::Regex::new(&s).map_err(serde::de::Error::custom)?; + Ok(re) +} + +fn serialize_regex(re: ®ex::Regex, serializer: S) -> Result +where + S: serde::ser::Serializer, +{ + serializer.collect_str(re.as_str()) +} + +impl Deref for Regex { + type Target = regex::Regex; + + fn deref(&self) -> ®ex::Regex { + &self.0 + } +} + +impl PartialEq for Regex { + fn eq(&self, other: &Regex) -> bool { + // comparing the automatons would be quite complicated + self.as_str() == other.as_str() + } +} + +impl Eq for Regex {} + +#[cfg(test)] +mod tests { + + #[test] + fn roundtrip() { + let input = r#""foo.*bar""#; + let re: super::Regex = serde_json::from_str(input).unwrap(); + assert!(re.is_match("foo123bar")); + assert!(!re.is_match("foo")); + let output = serde_json::to_string(&re).unwrap(); + assert_eq!(output, input); + } +} diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 8d6641a387..0bc7eba95e 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -48,6 +48,7 @@ serde_json = { workspace = true, features = ["raw_value"] } serde_with.workspace = true signal-hook.workspace = true svg_fmt.workspace = true +sync_wrapper.workspace = true tokio-tar.workspace = true thiserror.workspace = true tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index cbfd3e1165..ed23a18ee0 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -8,6 +8,7 @@ use anyhow::{anyhow, Context}; use clap::{Arg, ArgAction, Command}; use fail::FailScenario; use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp}; +use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task}; use remote_storage::GenericRemoteStorage; use tracing::*; @@ -314,14 +315,34 @@ fn start_pageserver( // Scan the local 'tenants/' directory and start loading the tenants BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(conf, remote_storage.clone()))?; + // shared state between the disk-usage backed eviction background task and the http endpoint + // that allows triggering disk-usage based eviction manually. note that the http endpoint + // is still accessible even if background task is not configured as long as remote storage has + // been configured. + let disk_usage_eviction_state: Arc = Arc::default(); + + if let Some(remote_storage) = &remote_storage { + launch_disk_usage_global_eviction_task( + conf, + remote_storage.clone(), + disk_usage_eviction_state.clone(), + )?; + } + // Start up the service to handle HTTP mgmt API request. We created the // listener earlier already. { let _rt_guard = MGMT_REQUEST_RUNTIME.enter(); - let router = http::make_router(conf, launch_ts, http_auth, remote_storage)? - .build() - .map_err(|err| anyhow!(err))?; + let router = http::make_router( + conf, + launch_ts, + http_auth, + remote_storage, + disk_usage_eviction_state, + )? + .build() + .map_err(|err| anyhow!(err))?; let service = utils::http::RouterService::new(router).unwrap(); let server = hyper::Server::from_tcp(http_listener)? .serve(service) diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 7293e69f69..19f0f22815 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -27,6 +27,7 @@ use utils::{ logging::LogFormat, }; +use crate::disk_usage_eviction_task::DiskUsageEvictionTaskConfig; use crate::tenant::config::TenantConf; use crate::tenant::config::TenantConfOpt; use crate::tenant::{TENANT_ATTACHING_MARKER_FILENAME, TIMELINES_SEGMENT_NAME}; @@ -92,6 +93,8 @@ pub mod defaults { #evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}' +#disk_usage_based_eviction = {{ max_usage_pct = .., min_avail_bytes = .., period = "10s"}} + # [tenant_config] #checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes #checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT} @@ -104,6 +107,8 @@ pub mod defaults { #image_creation_threshold = {DEFAULT_IMAGE_CREATION_THRESHOLD} #pitr_interval = '{DEFAULT_PITR_INTERVAL}' +#min_resident_size_override = .. # in bytes + # [remote_storage] "### @@ -180,6 +185,8 @@ pub struct PageServerConf { // See the corresponding metric's help string. pub evictions_low_residence_duration_metric_threshold: Duration, + pub disk_usage_based_eviction: Option, + pub test_remote_failures: u64, pub ondemand_download_behavior_treat_error_as_warn: bool, @@ -252,6 +259,8 @@ struct PageServerConfigBuilder { evictions_low_residence_duration_metric_threshold: BuilderValue, + disk_usage_based_eviction: BuilderValue>, + test_remote_failures: BuilderValue, ondemand_download_behavior_treat_error_as_warn: BuilderValue, @@ -312,6 +321,8 @@ impl Default for PageServerConfigBuilder { ) .expect("cannot parse DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD")), + disk_usage_based_eviction: Set(None), + test_remote_failures: Set(0), ondemand_download_behavior_treat_error_as_warn: Set(false), @@ -431,6 +442,10 @@ impl PageServerConfigBuilder { self.evictions_low_residence_duration_metric_threshold = BuilderValue::Set(value); } + pub fn disk_usage_based_eviction(&mut self, value: Option) { + self.disk_usage_based_eviction = BuilderValue::Set(value); + } + pub fn ondemand_download_behavior_treat_error_as_warn( &mut self, ondemand_download_behavior_treat_error_as_warn: bool, @@ -515,6 +530,9 @@ impl PageServerConfigBuilder { .ok_or(anyhow!( "missing evictions_low_residence_duration_metric_threshold" ))?, + disk_usage_based_eviction: self + .disk_usage_based_eviction + .ok_or(anyhow!("missing disk_usage_based_eviction"))?, test_remote_failures: self .test_remote_failures .ok_or(anyhow!("missing test_remote_failuers"))?, @@ -704,6 +722,12 @@ impl PageServerConf { builder.synthetic_size_calculation_interval(parse_toml_duration(key, item)?), "test_remote_failures" => builder.test_remote_failures(parse_toml_u64(key, item)?), "evictions_low_residence_duration_metric_threshold" => builder.evictions_low_residence_duration_metric_threshold(parse_toml_duration(key, item)?), + "disk_usage_based_eviction" => { + tracing::info!("disk_usage_based_eviction: {:#?}", &item); + builder.disk_usage_based_eviction( + toml_edit::de::from_item(item.clone()) + .context("parse disk_usage_based_eviction")?) + }, "ondemand_download_behavior_treat_error_as_warn" => builder.ondemand_download_behavior_treat_error_as_warn(parse_toml_bool(key, item)?), _ => bail!("unrecognized pageserver option '{key}'"), } @@ -808,6 +832,13 @@ impl PageServerConf { ); } + if let Some(item) = item.get("min_resident_size_override") { + t_conf.min_resident_size_override = Some( + toml_edit::de::from_item(item.clone()) + .context("parse min_resident_size_override")?, + ); + } + Ok(t_conf) } @@ -850,6 +881,7 @@ impl PageServerConf { defaults::DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD, ) .unwrap(), + disk_usage_based_eviction: None, test_remote_failures: 0, ondemand_download_behavior_treat_error_as_warn: false, } @@ -1058,6 +1090,7 @@ log_format = 'json' evictions_low_residence_duration_metric_threshold: humantime::parse_duration( defaults::DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD )?, + disk_usage_based_eviction: None, test_remote_failures: 0, ondemand_download_behavior_treat_error_as_warn: false, }, @@ -1112,6 +1145,7 @@ log_format = 'json' metric_collection_endpoint: Some(Url::parse("http://localhost:80/metrics")?), synthetic_size_calculation_interval: Duration::from_secs(333), evictions_low_residence_duration_metric_threshold: Duration::from_secs(444), + disk_usage_based_eviction: None, test_remote_failures: 0, ondemand_download_behavior_treat_error_as_warn: false, }, diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs new file mode 100644 index 0000000000..eeeb6fda89 --- /dev/null +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -0,0 +1,689 @@ +//! This module implements the pageserver-global disk-usage-based layer eviction task. +//! +//! # Mechanics +//! +//! Function `launch_disk_usage_global_eviction_task` starts a pageserver-global background +//! loop that evicts layers in response to a shortage of available bytes +//! in the $repo/tenants directory's filesystem. +//! +//! The loop runs periodically at a configurable `period`. +//! +//! Each loop iteration uses `statvfs` to determine filesystem-level space usage. +//! It compares the returned usage data against two different types of thresholds. +//! The iteration tries to evict layers until app-internal accounting says we should be below the thresholds. +//! We cross-check this internal accounting with the real world by making another `statvfs` at the end of the iteration. +//! We're good if that second statvfs shows that we're _actually_ below the configured thresholds. +//! If we're still above one or more thresholds, we emit a warning log message, leaving it to the operator to investigate further. +//! +//! # Eviction Policy +//! +//! There are two thresholds: +//! `max_usage_pct` is the relative available space, expressed in percent of the total filesystem space. +//! If the actual usage is higher, the threshold is exceeded. +//! `min_avail_bytes` is the absolute available space in bytes. +//! If the actual usage is lower, the threshold is exceeded. +//! If either of these thresholds is exceeded, the system is considered to have "disk pressure", and eviction +//! is performed on the next iteration, to release disk space and bring the usage below the thresholds again. +//! The iteration evicts layers in LRU fashion, but, with a weak reservation per tenant. +//! The reservation is to keep the most recently accessed X bytes per tenant resident. +//! If we cannot relieve pressure by evicting layers outside of the reservation, we +//! start evicting layers that are part of the reservation, LRU first. +//! +//! The value for the per-tenant reservation is referred to as `tenant_min_resident_size` +//! throughout the code, but, no actual variable carries that name. +//! The per-tenant default value is the `max(tenant's layer file sizes, regardless of local or remote)`. +//! The idea is to allow at least one layer to be resident per tenant, to ensure it can make forward progress +//! during page reconstruction. +//! An alternative default for all tenants can be specified in the `tenant_config` section of the config. +//! Lastly, each tenant can have an override in their respective tenant config (`min_resident_size_override`). + +// Implementation notes: +// - The `#[allow(dead_code)]` above various structs are to suppress warnings about only the Debug impl +// reading these fields. We use the Debug impl for semi-structured logging, though. + +use std::{ + collections::HashMap, + path::Path, + sync::Arc, + time::{Duration, SystemTime}, +}; + +use anyhow::Context; +use remote_storage::GenericRemoteStorage; +use serde::{Deserialize, Serialize}; +use tokio::time::Instant; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error, info, instrument, warn, Instrument}; +use utils::serde_percent::Percent; + +use crate::{ + config::PageServerConf, + task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}, + tenant::{self, storage_layer::PersistentLayer, Timeline}, +}; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct DiskUsageEvictionTaskConfig { + pub max_usage_pct: Percent, + pub min_avail_bytes: u64, + #[serde(with = "humantime_serde")] + pub period: Duration, + #[cfg(feature = "testing")] + pub mock_statvfs: Option, +} + +#[derive(Default)] +pub struct State { + /// Exclude http requests and background task from running at the same time. + mutex: tokio::sync::Mutex<()>, +} + +pub fn launch_disk_usage_global_eviction_task( + conf: &'static PageServerConf, + storage: GenericRemoteStorage, + state: Arc, +) -> anyhow::Result<()> { + let Some(task_config) = &conf.disk_usage_based_eviction else { + info!("disk usage based eviction task not configured"); + return Ok(()); + }; + + info!("launching disk usage based eviction task"); + + task_mgr::spawn( + BACKGROUND_RUNTIME.handle(), + TaskKind::DiskUsageEviction, + None, + None, + "disk usage based eviction", + false, + async move { + disk_usage_eviction_task( + &state, + task_config, + storage, + &conf.tenants_path(), + task_mgr::shutdown_token(), + ) + .await; + info!("disk usage based eviction task finishing"); + Ok(()) + }, + ); + + Ok(()) +} + +#[instrument(skip_all)] +async fn disk_usage_eviction_task( + state: &State, + task_config: &DiskUsageEvictionTaskConfig, + storage: GenericRemoteStorage, + tenants_dir: &Path, + cancel: CancellationToken, +) { + use crate::tenant::tasks::random_init_delay; + { + if random_init_delay(task_config.period, &cancel) + .await + .is_err() + { + info!("shutting down"); + return; + } + } + + let mut iteration_no = 0; + loop { + iteration_no += 1; + let start = Instant::now(); + + async { + let res = disk_usage_eviction_task_iteration( + state, + task_config, + &storage, + tenants_dir, + &cancel, + ) + .await; + + match res { + Ok(()) => {} + Err(e) => { + // these stat failures are expected to be very rare + warn!("iteration failed, unexpected error: {e:#}"); + } + } + } + .instrument(tracing::info_span!("iteration", iteration_no)) + .await; + + let sleep_until = start + task_config.period; + tokio::select! { + _ = tokio::time::sleep_until(sleep_until) => {}, + _ = cancel.cancelled() => { + info!("shutting down"); + break + } + } + } +} + +pub trait Usage: Clone + Copy + std::fmt::Debug { + fn has_pressure(&self) -> bool; + fn add_available_bytes(&mut self, bytes: u64); +} + +async fn disk_usage_eviction_task_iteration( + state: &State, + task_config: &DiskUsageEvictionTaskConfig, + storage: &GenericRemoteStorage, + tenants_dir: &Path, + cancel: &CancellationToken, +) -> anyhow::Result<()> { + let usage_pre = filesystem_level_usage::get(tenants_dir, task_config) + .context("get filesystem-level disk usage before evictions")?; + let res = disk_usage_eviction_task_iteration_impl(state, storage, usage_pre, cancel).await; + match res { + Ok(outcome) => { + debug!(?outcome, "disk_usage_eviction_iteration finished"); + match outcome { + IterationOutcome::NoPressure | IterationOutcome::Cancelled => { + // nothing to do, select statement below will handle things + } + IterationOutcome::Finished(outcome) => { + // Verify with statvfs whether we made any real progress + let after = filesystem_level_usage::get(tenants_dir, task_config) + // It's quite unlikely to hit the error here. Keep the code simple and bail out. + .context("get filesystem-level disk usage after evictions")?; + + debug!(?after, "disk usage"); + + if after.has_pressure() { + // Don't bother doing an out-of-order iteration here now. + // In practice, the task period is set to a value in the tens-of-seconds range, + // which will cause another iteration to happen soon enough. + // TODO: deltas between the three different usages would be helpful, + // consider MiB, GiB, TiB + warn!(?outcome, ?after, "disk usage still high"); + } else { + info!(?outcome, ?after, "disk usage pressure relieved"); + } + } + } + } + Err(e) => { + error!("disk_usage_eviction_iteration failed: {:#}", e); + } + } + + Ok(()) +} + +#[derive(Debug, Serialize)] +#[allow(clippy::large_enum_variant)] +pub enum IterationOutcome { + NoPressure, + Cancelled, + Finished(IterationOutcomeFinished), +} + +#[allow(dead_code)] +#[derive(Debug, Serialize)] +pub struct IterationOutcomeFinished { + /// The actual usage observed before we started the iteration. + before: U, + /// The expected value for `after`, according to internal accounting, after phase 1. + planned: PlannedUsage, + /// The outcome of phase 2, where we actually do the evictions. + /// + /// If all layers that phase 1 planned to evict _can_ actually get evicted, this will + /// be the same as `planned`. + assumed: AssumedUsage, +} + +#[derive(Debug, Serialize)] +#[allow(dead_code)] +struct AssumedUsage { + /// The expected value for `after`, after phase 2. + projected_after: U, + /// The layers we failed to evict during phase 2. + failed: LayerCount, +} + +#[allow(dead_code)] +#[derive(Debug, Serialize)] +struct PlannedUsage { + respecting_tenant_min_resident_size: U, + fallback_to_global_lru: Option, +} + +#[allow(dead_code)] +#[derive(Debug, Default, Serialize)] +struct LayerCount { + file_sizes: u64, + count: usize, +} + +pub async fn disk_usage_eviction_task_iteration_impl( + state: &State, + storage: &GenericRemoteStorage, + usage_pre: U, + cancel: &CancellationToken, +) -> anyhow::Result> { + // use tokio's mutex to get a Sync guard (instead of std::sync::Mutex) + let _g = state + .mutex + .try_lock() + .map_err(|_| anyhow::anyhow!("iteration is already executing"))?; + + debug!(?usage_pre, "disk usage"); + + if !usage_pre.has_pressure() { + return Ok(IterationOutcome::NoPressure); + } + + warn!( + ?usage_pre, + "running disk usage based eviction due to pressure" + ); + + let candidates = match collect_eviction_candidates(cancel).await? { + EvictionCandidates::Cancelled => { + return Ok(IterationOutcome::Cancelled); + } + EvictionCandidates::Finished(partitioned) => partitioned, + }; + + // Debug-log the list of candidates + let now = SystemTime::now(); + for (i, (partition, candidate)) in candidates.iter().enumerate() { + debug!( + "cand {}/{}: size={}, no_access_for={}us, parition={:?}, tenant={} timeline={} layer={}", + i + 1, + candidates.len(), + candidate.layer.file_size(), + now.duration_since(candidate.last_activity_ts) + .unwrap() + .as_micros(), + partition, + candidate.layer.get_tenant_id(), + candidate.layer.get_timeline_id(), + candidate.layer.filename().file_name(), + ); + } + + // phase1: select victims to relieve pressure + // + // Walk through the list of candidates, until we have accumulated enough layers to get + // us back under the pressure threshold. 'usage_planned' is updated so that it tracks + // how much disk space would be used after evicting all the layers up to the current + // point in the list. The layers are collected in 'batched', grouped per timeline. + // + // If we get far enough in the list that we start to evict layers that are below + // the tenant's min-resident-size threshold, print a warning, and memorize the disk + // usage at that point, in 'usage_planned_min_resident_size_respecting'. + let mut batched: HashMap<_, Vec>> = HashMap::new(); + let mut warned = None; + let mut usage_planned = usage_pre; + for (i, (partition, candidate)) in candidates.into_iter().enumerate() { + if !usage_planned.has_pressure() { + debug!( + no_candidates_evicted = i, + "took enough candidates for pressure to be relieved" + ); + break; + } + + if partition == MinResidentSizePartition::Below && warned.is_none() { + warn!(?usage_pre, ?usage_planned, candidate_no=i, "tenant_min_resident_size-respecting LRU would not relieve pressure, evicting more following global LRU policy"); + warned = Some(usage_planned); + } + + usage_planned.add_available_bytes(candidate.layer.file_size()); + + batched + .entry(TimelineKey(candidate.timeline)) + .or_default() + .push(candidate.layer); + } + + let usage_planned = match warned { + Some(respecting_tenant_min_resident_size) => PlannedUsage { + respecting_tenant_min_resident_size, + fallback_to_global_lru: Some(usage_planned), + }, + None => PlannedUsage { + respecting_tenant_min_resident_size: usage_planned, + fallback_to_global_lru: None, + }, + }; + debug!(?usage_planned, "usage planned"); + + // phase2: evict victims batched by timeline + + // After the loop, `usage_assumed` is the post-eviction usage, + // according to internal accounting. + let mut usage_assumed = usage_pre; + let mut evictions_failed = LayerCount::default(); + for (timeline, batch) in batched { + let tenant_id = timeline.tenant_id; + let timeline_id = timeline.timeline_id; + let batch_size = batch.len(); + + debug!(%timeline_id, "evicting batch for timeline"); + + async { + let results = timeline.evict_layers(storage, &batch, cancel.clone()).await; + + match results { + Err(e) => { + warn!("failed to evict batch: {:#}", e); + } + Ok(results) => { + assert_eq!(results.len(), batch.len()); + for (result, layer) in results.into_iter().zip(batch.iter()) { + match result { + Some(Ok(true)) => { + usage_assumed.add_available_bytes(layer.file_size()); + } + Some(Ok(false)) => { + // this is: + // - Replacement::{NotFound, Unexpected} + // - it cannot be is_remote_layer, filtered already + evictions_failed.file_sizes += layer.file_size(); + evictions_failed.count += 1; + } + None => { + assert!(cancel.is_cancelled()); + return; + } + Some(Err(e)) => { + // we really shouldn't be getting this, precondition failure + error!("failed to evict layer: {:#}", e); + } + } + } + } + } + } + .instrument(tracing::info_span!("evict_batch", %tenant_id, %timeline_id, batch_size)) + .await; + + if cancel.is_cancelled() { + return Ok(IterationOutcome::Cancelled); + } + } + + Ok(IterationOutcome::Finished(IterationOutcomeFinished { + before: usage_pre, + planned: usage_planned, + assumed: AssumedUsage { + projected_after: usage_assumed, + failed: evictions_failed, + }, + })) +} + +#[derive(Clone)] +struct EvictionCandidate { + timeline: Arc, + layer: Arc, + last_activity_ts: SystemTime, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +enum MinResidentSizePartition { + Above, + Below, +} + +enum EvictionCandidates { + Cancelled, + Finished(Vec<(MinResidentSizePartition, EvictionCandidate)>), +} + +/// Gather the eviction candidates. +/// +/// The returned `Ok(EvictionCandidates::Finished(candidates))` is sorted in eviction +/// order. A caller that evicts in that order, until pressure is relieved, implements +/// the eviction policy outlined in the module comment. +/// +/// # Example +/// +/// Imagine that there are two tenants, A and B, with five layers each, a-e. +/// Each layer has size 100, and both tenant's min_resident_size is 150. +/// The eviction order would be +/// +/// ```text +/// partition last_activity_ts tenant/layer +/// Above 18:30 A/c +/// Above 19:00 A/b +/// Above 18:29 B/c +/// Above 19:05 B/b +/// Above 20:00 B/a +/// Above 20:03 A/a +/// Below 20:30 A/d +/// Below 20:40 B/d +/// Below 20:45 B/e +/// Below 20:58 A/e +/// ``` +/// +/// Now, if we need to evict 300 bytes to relieve pressure, we'd evict `A/c, A/b, B/c`. +/// They are all in the `Above` partition, so, we respected each tenant's min_resident_size. +/// +/// But, if we need to evict 900 bytes to relieve pressure, we'd evict +/// `A/c, A/b, B/c, B/b, B/a, A/a, A/d, B/d, B/e`, reaching into the `Below` partition +/// after exhauting the `Above` partition. +/// So, we did not respect each tenant's min_resident_size. +async fn collect_eviction_candidates( + cancel: &CancellationToken, +) -> anyhow::Result { + // get a snapshot of the list of tenants + let tenants = tenant::mgr::list_tenants() + .await + .context("get list of tenants")?; + + let mut candidates = Vec::new(); + + for (tenant_id, _state) in &tenants { + if cancel.is_cancelled() { + return Ok(EvictionCandidates::Cancelled); + } + let tenant = match tenant::mgr::get_tenant(*tenant_id, true).await { + Ok(tenant) => tenant, + Err(e) => { + // this can happen if tenant has lifecycle transition after we fetched it + debug!("failed to get tenant: {e:#}"); + continue; + } + }; + + // collect layers from all timelines in this tenant + // + // If one of the timelines becomes `!is_active()` during the iteration, + // for example because we're shutting down, then `max_layer_size` can be too small. + // That's OK. This code only runs under a disk pressure situation, and being + // a little unfair to tenants during shutdown in such a situation is tolerable. + let mut tenant_candidates = Vec::new(); + let mut max_layer_size = 0; + for tl in tenant.list_timelines() { + if !tl.is_active() { + continue; + } + let info = tl.get_local_layers_for_disk_usage_eviction(); + debug!(tenant_id=%tl.tenant_id, timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len()); + tenant_candidates.extend( + info.resident_layers + .into_iter() + .map(|layer_infos| (tl.clone(), layer_infos)), + ); + max_layer_size = max_layer_size.max(info.max_layer_size.unwrap_or(0)); + + if cancel.is_cancelled() { + return Ok(EvictionCandidates::Cancelled); + } + } + + // `min_resident_size` defaults to maximum layer file size of the tenant. + // This ensures that each tenant can have at least one layer resident at a given time, + // ensuring forward progress for a single Timeline::get in that tenant. + // It's a questionable heuristic since, usually, there are many Timeline::get + // requests going on for a tenant, and, at least in Neon prod, the median + // layer file size is much smaller than the compaction target size. + // We could be better here, e.g., sum of all L0 layers + most recent L1 layer. + // That's what's typically used by the various background loops. + // + // The default can be overriden with a fixed value in the tenant conf. + // A default override can be put in the default tenant conf in the pageserver.toml. + let min_resident_size = if let Some(s) = tenant.get_min_resident_size_override() { + debug!( + tenant_id=%tenant.tenant_id(), + overriden_size=s, + "using overridden min resident size for tenant" + ); + s + } else { + debug!( + tenant_id=%tenant.tenant_id(), + max_layer_size, + "using max layer size as min_resident_size for tenant", + ); + max_layer_size + }; + + // Sort layers most-recently-used first, then partition by + // cumsum above/below min_resident_size. + tenant_candidates + .sort_unstable_by_key(|(_, layer_info)| std::cmp::Reverse(layer_info.last_activity_ts)); + let mut cumsum: i128 = 0; + for (timeline, layer_info) in tenant_candidates.into_iter() { + let file_size = layer_info.file_size(); + let candidate = EvictionCandidate { + timeline, + last_activity_ts: layer_info.last_activity_ts, + layer: layer_info.layer, + }; + let partition = if cumsum > min_resident_size as i128 { + MinResidentSizePartition::Above + } else { + MinResidentSizePartition::Below + }; + candidates.push((partition, candidate)); + cumsum += i128::from(file_size); + } + } + + debug_assert!(MinResidentSizePartition::Above < MinResidentSizePartition::Below, + "as explained in the function's doc comment, layers that aren't in the tenant's min_resident_size are evicted first"); + candidates + .sort_unstable_by_key(|(partition, candidate)| (*partition, candidate.last_activity_ts)); + + Ok(EvictionCandidates::Finished(candidates)) +} + +struct TimelineKey(Arc); + +impl PartialEq for TimelineKey { + fn eq(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.0, &other.0) + } +} + +impl Eq for TimelineKey {} + +impl std::hash::Hash for TimelineKey { + fn hash(&self, state: &mut H) { + Arc::as_ptr(&self.0).hash(state); + } +} + +impl std::ops::Deref for TimelineKey { + type Target = Timeline; + + fn deref(&self) -> &Self::Target { + self.0.as_ref() + } +} + +mod filesystem_level_usage { + use std::path::Path; + + use anyhow::Context; + + use crate::statvfs::Statvfs; + + use super::DiskUsageEvictionTaskConfig; + + #[derive(Debug, Clone, Copy)] + #[allow(dead_code)] + pub struct Usage<'a> { + config: &'a DiskUsageEvictionTaskConfig, + + /// Filesystem capacity + total_bytes: u64, + /// Free filesystem space + avail_bytes: u64, + } + + impl super::Usage for Usage<'_> { + fn has_pressure(&self) -> bool { + let usage_pct = + (100.0 * (1.0 - ((self.avail_bytes as f64) / (self.total_bytes as f64)))) as u64; + + let pressures = [ + ( + "min_avail_bytes", + self.avail_bytes < self.config.min_avail_bytes, + ), + ( + "max_usage_pct", + usage_pct > self.config.max_usage_pct.get() as u64, + ), + ]; + + pressures.into_iter().any(|(_, has_pressure)| has_pressure) + } + + fn add_available_bytes(&mut self, bytes: u64) { + self.avail_bytes += bytes; + } + } + + pub fn get<'a>( + tenants_dir: &Path, + config: &'a DiskUsageEvictionTaskConfig, + ) -> anyhow::Result> { + let mock_config = { + #[cfg(feature = "testing")] + { + config.mock_statvfs.as_ref() + } + #[cfg(not(feature = "testing"))] + { + None + } + }; + + let stat = Statvfs::get(tenants_dir, mock_config) + .context("statvfs failed, presumably directory got unlinked")?; + + // https://unix.stackexchange.com/a/703650 + let blocksize = if stat.fragment_size() > 0 { + stat.fragment_size() + } else { + stat.block_size() + }; + + // use blocks_available (b_avail) since, pageserver runs as unprivileged user + let avail_bytes = stat.blocks_available() * blocksize; + let total_bytes = stat.blocks() * blocksize; + + Ok(Usage { + config, + total_bytes, + avail_bytes, + }) + } +} diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index eda4a60e95..478e9d228a 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -27,6 +27,31 @@ paths: id: type: integer + /v1/disk_usage_eviction/run: + put: + description: Do an iteration of disk-usage-based eviction to evict a given amount of disk space. + security: [] + requestBody: + content: + application/json: + schema: + type: object + required: + - evict_bytes + properties: + evict_bytes: + type: integer + responses: + "200": + description: | + The run completed. + This does not necessarily mean that we actually evicted `evict_bytes`. + Examine the returned object for detail, or, just watch the actual effect of the call using `du` or `df`. + content: + application/json: + schema: + type: object + /v1/tenant/{tenant_id}: parameters: - name: tenant_id diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index b0addc82f1..2db60f557d 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -18,6 +18,7 @@ use super::models::{ TimelineCreateRequest, TimelineGcRequest, TimelineInfo, }; use crate::context::{DownloadBehavior, RequestContext}; +use crate::disk_usage_eviction_task; use crate::pgdatadir_mapping::LsnForTimestamp; use crate::task_mgr::TaskKind; use crate::tenant::config::TenantConfOpt; @@ -48,6 +49,7 @@ struct State { auth: Option>, allowlist_routes: Vec, remote_storage: Option, + disk_usage_eviction_state: Arc, } impl State { @@ -55,6 +57,7 @@ impl State { conf: &'static PageServerConf, auth: Option>, remote_storage: Option, + disk_usage_eviction_state: Arc, ) -> anyhow::Result { let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"] .iter() @@ -65,6 +68,7 @@ impl State { auth, allowlist_routes, remote_storage, + disk_usage_eviction_state, }) } } @@ -775,6 +779,8 @@ async fn tenant_create_handler(mut request: Request) -> Result) -> Result, ApiError> { + let tenant_id: TenantId = parse_request_param(&r, "tenant_id")?; + + let tenant = crate::tenant::mgr::get_tenant(tenant_id, true) + .await + .map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?; + + tenant.set_broken("broken from test"); + + json_response(StatusCode::OK, ()) +} + #[cfg(feature = "testing")] async fn failpoints_handler(mut request: Request) -> Result, ApiError> { if !fail::has_failpoints() { @@ -1063,6 +1085,89 @@ async fn always_panic_handler(req: Request) -> Result, ApiE json_response(StatusCode::NO_CONTENT, ()) } +async fn disk_usage_eviction_run(mut r: Request) -> Result, ApiError> { + check_permission(&r, None)?; + + #[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)] + struct Config { + /// How many bytes to evict before reporting that pressure is relieved. + evict_bytes: u64, + } + + #[derive(Debug, Clone, Copy, serde::Serialize)] + struct Usage { + // remains unchanged after instantiation of the struct + config: Config, + // updated by `add_available_bytes` + freed_bytes: u64, + } + + impl crate::disk_usage_eviction_task::Usage for Usage { + fn has_pressure(&self) -> bool { + self.config.evict_bytes > self.freed_bytes + } + + fn add_available_bytes(&mut self, bytes: u64) { + self.freed_bytes += bytes; + } + } + + let config = json_request::(&mut r) + .await + .map_err(|_| ApiError::BadRequest(anyhow::anyhow!("invalid JSON body")))?; + + let usage = Usage { + config, + freed_bytes: 0, + }; + + use crate::task_mgr::MGMT_REQUEST_RUNTIME; + + let (tx, rx) = tokio::sync::oneshot::channel(); + + let state = get_state(&r); + + let Some(storage) = state.remote_storage.clone() else { + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "remote storage not configured, cannot run eviction iteration" + ))) + }; + + let state = state.disk_usage_eviction_state.clone(); + + let cancel = CancellationToken::new(); + let child_cancel = cancel.clone(); + let _g = cancel.drop_guard(); + + crate::task_mgr::spawn( + MGMT_REQUEST_RUNTIME.handle(), + TaskKind::DiskUsageEviction, + None, + None, + "ondemand disk usage eviction", + false, + async move { + let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl( + &state, + &storage, + usage, + &child_cancel, + ) + .await; + + info!(?res, "disk_usage_eviction_task_iteration_impl finished"); + + let _ = tx.send(res); + Ok(()) + } + .in_current_span(), + ); + + let response = rx.await.unwrap().map_err(ApiError::InternalServerError)?; + + json_response(StatusCode::OK, response) +} + async fn handler_404(_: Request) -> Result, ApiError> { json_response( StatusCode::NOT_FOUND, @@ -1075,6 +1180,7 @@ pub fn make_router( launch_ts: &'static LaunchTimestamp, auth: Option>, remote_storage: Option, + disk_usage_eviction_state: Arc, ) -> anyhow::Result> { let spec = include_bytes!("openapi_spec.yml"); let mut router = attach_openapi_ui(endpoint::make_router(), spec, "/swagger.yml", "/v1/doc"); @@ -1119,7 +1225,8 @@ pub fn make_router( Ok(router .data(Arc::new( - State::new(conf, auth, remote_storage).context("Failed to initialize router state")?, + State::new(conf, auth, remote_storage, disk_usage_eviction_state) + .context("Failed to initialize router state")?, )) .get("/v1/status", |r| RequestSpan(status_handler).handle(r)) .put( @@ -1200,6 +1307,13 @@ pub fn make_router( "/v1/tenant/:tenant_id/timeline/:timeline_id/layer/:layer_file_name", |r| RequestSpan(evict_timeline_layer_handler).handle(r), ) + .put("/v1/disk_usage_eviction/run", |r| { + RequestSpan(disk_usage_eviction_run).handle(r) + }) + .put( + "/v1/tenant/:tenant_id/break", + testing_api!("set tenant state to broken", handle_tenant_break), + ) .get("/v1/panic", |r| RequestSpan(always_panic_handler).handle(r)) .any(handler_404)) } diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 09e21ae755..278658eba3 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -4,6 +4,7 @@ pub mod broker_client; pub mod config; pub mod consumption_metrics; pub mod context; +pub mod disk_usage_eviction_task; pub mod http; pub mod import_datadir; pub mod keyspace; @@ -12,6 +13,7 @@ pub mod page_cache; pub mod page_service; pub mod pgdatadir_mapping; pub mod repository; +pub(crate) mod statvfs; pub mod task_mgr; pub mod tenant; pub mod trace; diff --git a/pageserver/src/statvfs.rs b/pageserver/src/statvfs.rs new file mode 100644 index 0000000000..28d950b5e6 --- /dev/null +++ b/pageserver/src/statvfs.rs @@ -0,0 +1,150 @@ +//! Wrapper around nix::sys::statvfs::Statvfs that allows for mocking. + +use std::path::Path; + +pub enum Statvfs { + Real(nix::sys::statvfs::Statvfs), + Mock(mock::Statvfs), +} + +// NB: on macOS, the block count type of struct statvfs is u32. +// The workaround seems to be to use the non-standard statfs64 call. +// Sincce it should only be a problem on > 2TiB disks, let's ignore +// the problem for now and upcast to u64. +impl Statvfs { + pub fn get(tenants_dir: &Path, mocked: Option<&mock::Behavior>) -> nix::Result { + if let Some(mocked) = mocked { + Ok(Statvfs::Mock(mock::get(tenants_dir, mocked)?)) + } else { + Ok(Statvfs::Real(nix::sys::statvfs::statvfs(tenants_dir)?)) + } + } + + // NB: allow() because the block count type is u32 on macOS. + #[allow(clippy::useless_conversion)] + pub fn blocks(&self) -> u64 { + match self { + Statvfs::Real(stat) => u64::try_from(stat.blocks()).unwrap(), + Statvfs::Mock(stat) => stat.blocks, + } + } + + // NB: allow() because the block count type is u32 on macOS. + #[allow(clippy::useless_conversion)] + pub fn blocks_available(&self) -> u64 { + match self { + Statvfs::Real(stat) => u64::try_from(stat.blocks_available()).unwrap(), + Statvfs::Mock(stat) => stat.blocks_available, + } + } + + pub fn fragment_size(&self) -> u64 { + match self { + Statvfs::Real(stat) => stat.fragment_size(), + Statvfs::Mock(stat) => stat.fragment_size, + } + } + + pub fn block_size(&self) -> u64 { + match self { + Statvfs::Real(stat) => stat.block_size(), + Statvfs::Mock(stat) => stat.block_size, + } + } +} + +pub mod mock { + use anyhow::Context; + use regex::Regex; + use std::path::Path; + use tracing::log::info; + + #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] + #[serde(tag = "type")] + pub enum Behavior { + Success { + blocksize: u64, + total_blocks: u64, + name_filter: Option, + }, + Failure { + mocked_error: MockedError, + }, + } + + #[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] + #[allow(clippy::upper_case_acronyms)] + pub enum MockedError { + EIO, + } + + impl From for nix::Error { + fn from(e: MockedError) -> Self { + match e { + MockedError::EIO => nix::Error::EIO, + } + } + } + + pub fn get(tenants_dir: &Path, behavior: &Behavior) -> nix::Result { + info!("running mocked statvfs"); + + match behavior { + Behavior::Success { + blocksize, + total_blocks, + ref name_filter, + } => { + let used_bytes = walk_dir_disk_usage(tenants_dir, name_filter.as_deref()).unwrap(); + + // round it up to the nearest block multiple + let used_blocks = (used_bytes + (blocksize - 1)) / blocksize; + + if used_blocks > *total_blocks { + panic!( + "mocking error: used_blocks > total_blocks: {used_blocks} > {total_blocks}" + ); + } + + let avail_blocks = total_blocks - used_blocks; + + Ok(Statvfs { + blocks: *total_blocks, + blocks_available: avail_blocks, + fragment_size: *blocksize, + block_size: *blocksize, + }) + } + Behavior::Failure { mocked_error } => Err((*mocked_error).into()), + } + } + + fn walk_dir_disk_usage(path: &Path, name_filter: Option<&Regex>) -> anyhow::Result { + let mut total = 0; + for entry in walkdir::WalkDir::new(path) { + let entry = entry?; + if !entry.file_type().is_file() { + continue; + } + if !name_filter + .as_ref() + .map(|filter| filter.is_match(entry.file_name().to_str().unwrap())) + .unwrap_or(true) + { + continue; + } + total += entry + .metadata() + .with_context(|| format!("get metadata of {:?}", entry.path()))? + .len(); + } + Ok(total) + } + + pub struct Statvfs { + pub blocks: u64, + pub blocks_available: u64, + pub fragment_size: u64, + pub block_size: u64, + } +} diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 44b1bbb06d..82aebc6c07 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -234,6 +234,9 @@ pub enum TaskKind { // Eviction. One per timeline. Eviction, + /// See [`crate::disk_usage_eviction_task`]. + DiskUsageEviction, + // Initial logical size calculation InitialLogicalSizeCalculation, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 2c5226e5bc..7fac7d2ac0 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -95,7 +95,7 @@ mod timeline; pub mod size; -pub use timeline::{PageReconstructError, Timeline}; +pub use timeline::{LocalLayerInfoForDiskUsageEviction, PageReconstructError, Timeline}; // re-export this function so that page_cache.rs can use it. pub use crate::tenant::ephemeral_file::writeback as writeback_ephemeral_file; @@ -1706,6 +1706,13 @@ impl Tenant { .unwrap_or(self.conf.default_tenant_conf.trace_read_requests) } + pub fn get_min_resident_size_override(&self) -> Option { + let tenant_conf = self.tenant_conf.read().unwrap(); + tenant_conf + .min_resident_size_override + .or(self.conf.default_tenant_conf.min_resident_size_override) + } + pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) { *self.tenant_conf.write().unwrap() = new_tenant_conf; } @@ -2783,6 +2790,7 @@ pub mod harness { max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag), trace_read_requests: Some(tenant_conf.trace_read_requests), eviction_policy: Some(tenant_conf.eviction_policy), + min_resident_size_override: tenant_conf.min_resident_size_override, } } } diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index 48cb6be121..cdabb23a7b 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -92,6 +92,7 @@ pub struct TenantConf { pub max_lsn_wal_lag: NonZeroU64, pub trace_read_requests: bool, pub eviction_policy: EvictionPolicy, + pub min_resident_size_override: Option, } /// Same as TenantConf, but this struct preserves the information about @@ -159,6 +160,10 @@ pub struct TenantConfOpt { #[serde(skip_serializing_if = "Option::is_none")] #[serde(default)] pub eviction_policy: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub min_resident_size_override: Option, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] @@ -220,6 +225,9 @@ impl TenantConfOpt { .trace_read_requests .unwrap_or(global_conf.trace_read_requests), eviction_policy: self.eviction_policy.unwrap_or(global_conf.eviction_policy), + min_resident_size_override: self + .min_resident_size_override + .or(global_conf.min_resident_size_override), } } } @@ -251,6 +259,7 @@ impl Default for TenantConf { .expect("cannot parse default max walreceiver Lsn wal lag"), trace_read_requests: false, eviction_policy: EvictionPolicy::NoEviction, + min_resident_size_override: None, } } } diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index c36b6121c0..2ee723e7c3 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -121,10 +121,10 @@ struct LayerAccessStatsInner { } #[derive(Debug, Clone, Copy)] -pub(super) struct LayerAccessStatFullDetails { - pub(super) when: SystemTime, - pub(super) task_kind: TaskKind, - pub(super) access_kind: LayerAccessKind, +pub(crate) struct LayerAccessStatFullDetails { + pub(crate) when: SystemTime, + pub(crate) task_kind: TaskKind, + pub(crate) access_kind: LayerAccessKind, } #[derive(Clone, Copy, strum_macros::EnumString)] @@ -255,7 +255,7 @@ impl LayerAccessStats { ret } - pub(super) fn most_recent_access_or_residence_event( + fn most_recent_access_or_residence_event( &self, ) -> Either { let locked = self.0.lock().unwrap(); @@ -268,6 +268,13 @@ impl LayerAccessStats { } } } + + pub(crate) fn latest_activity(&self) -> SystemTime { + match self.most_recent_access_or_residence_event() { + Either::Left(mra) => mra.when, + Either::Right(re) => re.timestamp, + } + } } /// Supertrait of the [`Layer`] trait that captures the bare minimum interface diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index e1db34ec1b..b40cb05411 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -13,6 +13,7 @@ use pageserver_api::models::{ DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceStatus, TimelineState, }; +use remote_storage::GenericRemoteStorage; use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError}; use tokio_util::sync::CancellationToken; use tracing::*; @@ -957,6 +958,25 @@ impl Timeline { } } + /// Evict a batch of layers. + /// + /// GenericRemoteStorage reference is required as a witness[^witness_article] for "remote storage is configured." + /// + /// [^witness_article]: https://willcrichton.net/rust-api-type-patterns/witnesses.html + pub async fn evict_layers( + &self, + _: &GenericRemoteStorage, + layers_to_evict: &[Arc], + cancel: CancellationToken, + ) -> anyhow::Result>>> { + let remote_client = self.remote_client.clone().expect( + "GenericRemoteStorage is configured, so timeline must have RemoteTimelineClient", + ); + + self.evict_layer_batch(&remote_client, layers_to_evict, cancel) + .await + } + /// Evict multiple layers at once, continuing through errors. /// /// Try to evict the given `layers_to_evict` by @@ -994,6 +1014,15 @@ impl Timeline { // now lock out layer removal (compaction, gc, timeline deletion) let layer_removal_guard = self.layer_removal_cs.lock().await; + { + // to avoid racing with detach and delete_timeline + let state = self.current_state(); + anyhow::ensure!( + state == TimelineState::Active, + "timeline is not active but {state:?}" + ); + } + // start the batch update let mut layer_map = self.layers.write().unwrap(); let mut batch_updates = layer_map.batch_update(); @@ -1027,6 +1056,8 @@ impl Timeline { use super::layer_map::Replacement; if local_layer.is_remote_layer() { + // TODO(issue #3851): consider returning an err here instead of false, + // which is the same out the match later return Ok(false); } @@ -4012,6 +4043,67 @@ impl Timeline { } } +pub struct DiskUsageEvictionInfo { + /// Timeline's largest layer (remote or resident) + pub max_layer_size: Option, + /// Timeline's resident layers + pub resident_layers: Vec, +} + +pub struct LocalLayerInfoForDiskUsageEviction { + pub layer: Arc, + pub last_activity_ts: SystemTime, +} + +impl std::fmt::Debug for LocalLayerInfoForDiskUsageEviction { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // format the tv_sec, tv_nsec into rfc3339 in case someone is looking at it + // having to allocate a string to this is bad, but it will rarely be formatted + let ts = chrono::DateTime::::from(self.last_activity_ts); + let ts = ts.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true); + f.debug_struct("LocalLayerInfoForDiskUsageEviction") + .field("layer", &self.layer) + .field("last_activity", &ts) + .finish() + } +} + +impl LocalLayerInfoForDiskUsageEviction { + pub fn file_size(&self) -> u64 { + self.layer.file_size() + } +} + +impl Timeline { + pub(crate) fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo { + let layers = self.layers.read().unwrap(); + + let mut max_layer_size: Option = None; + let mut resident_layers = Vec::new(); + + for l in layers.iter_historic_layers() { + let file_size = l.file_size(); + max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size))); + + if l.is_remote_layer() { + continue; + } + + let last_activity_ts = l.access_stats().latest_activity(); + + resident_layers.push(LocalLayerInfoForDiskUsageEviction { + layer: l, + last_activity_ts, + }); + } + + DiskUsageEvictionInfo { + max_layer_size, + resident_layers, + } + } +} + type TraversalPathItem = ( ValueReconstructResult, Lsn, diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 107cd89b90..cf799a8808 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -20,7 +20,6 @@ use std::{ time::{Duration, SystemTime}, }; -use either::Either; use tokio::time::Instant; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, instrument, warn}; @@ -185,13 +184,7 @@ impl Timeline { if hist_layer.is_remote_layer() { continue; } - let last_activity_ts = match hist_layer - .access_stats() - .most_recent_access_or_residence_event() - { - Either::Left(mra) => mra.when, - Either::Right(re) => re.timestamp, - }; + let last_activity_ts = hist_layer.access_stats().latest_activity(); let no_activity_for = match now.duration_since(last_activity_ts) { Ok(d) => d, Err(_e) => { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 9929d3e66b..a232bf8b6d 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1220,6 +1220,28 @@ class PageserverHttpClient(requests.Session): self.verbose_error(res) return TenantConfig.from_json(res.json()) + def set_tenant_config(self, tenant_id: TenantId, config: dict[str, Any]): + assert "tenant_id" not in config.keys() + res = self.put( + f"http://localhost:{self.port}/v1/tenant/config", + json={**config, "tenant_id": str(tenant_id)}, + ) + self.verbose_error(res) + + def patch_tenant_config_client_side( + self, + tenant_id: TenantId, + inserts: Optional[Dict[str, Any]] = None, + removes: Optional[List[str]] = None, + ): + current = self.tenant_config(tenant_id).tenant_specific_overrides + if inserts is not None: + current.update(inserts) + if removes is not None: + for key in removes: + del current[key] + self.set_tenant_config(tenant_id, current) + def tenant_size(self, tenant_id: TenantId) -> int: return self.tenant_size_and_modelinputs(tenant_id)[0] @@ -1536,6 +1558,18 @@ class PageserverHttpClient(requests.Session): for layer in info.historic_layers: self.evict_layer(tenant_id, timeline_id, layer.layer_file_name) + def disk_usage_eviction_run(self, request: dict[str, Any]): + res = self.put( + f"http://localhost:{self.port}/v1/disk_usage_eviction/run", + json=request, + ) + self.verbose_error(res) + return res.json() + + def tenant_break(self, tenant_id: TenantId): + res = self.put(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/break") + self.verbose_error(res) + @dataclass class TenantConfig: diff --git a/test_runner/regress/test_disk_usage_eviction.py b/test_runner/regress/test_disk_usage_eviction.py new file mode 100644 index 0000000000..6ed09734fe --- /dev/null +++ b/test_runner/regress/test_disk_usage_eviction.py @@ -0,0 +1,541 @@ +import shutil +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Dict, Tuple + +import pytest +import toml +from fixtures.log_helper import log +from fixtures.neon_fixtures import ( + LocalFsStorage, + NeonEnv, + NeonEnvBuilder, + PageserverHttpClient, + PgBin, + RemoteStorageKind, + wait_for_last_flush_lsn, + wait_for_upload_queue_empty, + wait_until, +) +from fixtures.types import Lsn, TenantId, TimelineId + +GLOBAL_LRU_LOG_LINE = "tenant_min_resident_size-respecting LRU would not relieve pressure, evicting more following global LRU policy" + + +@pytest.mark.parametrize("config_level_override", [None, 400]) +def test_min_resident_size_override_handling( + neon_env_builder: NeonEnvBuilder, config_level_override: int +): + env = neon_env_builder.init_start() + ps_http = env.pageserver.http_client() + + def assert_config(tenant_id, expect_override, expect_effective): + config = ps_http.tenant_config(tenant_id) + assert config.tenant_specific_overrides.get("min_resident_size_override") == expect_override + assert config.effective_config.get("min_resident_size_override") == expect_effective + + def assert_overrides(tenant_id, default_tenant_conf_value): + ps_http.set_tenant_config(tenant_id, {"min_resident_size_override": 200}) + assert_config(tenant_id, 200, 200) + + ps_http.set_tenant_config(tenant_id, {"min_resident_size_override": 0}) + assert_config(tenant_id, 0, 0) + + ps_http.set_tenant_config(tenant_id, {}) + assert_config(tenant_id, None, default_tenant_conf_value) + + env.pageserver.stop() + if config_level_override is not None: + env.pageserver.start( + overrides=( + "--pageserver-config-override=tenant_config={ min_resident_size_override = " + + str(config_level_override) + + " }", + ) + ) + else: + env.pageserver.start() + + tenant_id, _ = env.neon_cli.create_tenant() + assert_overrides(tenant_id, config_level_override) + + # Also ensure that specifying the paramter to create_tenant works, in addition to http-level recconfig. + tenant_id, _ = env.neon_cli.create_tenant(conf={"min_resident_size_override": "100"}) + assert_config(tenant_id, 100, 100) + ps_http.set_tenant_config(tenant_id, {}) + assert_config(tenant_id, None, config_level_override) + + +@dataclass +class EvictionEnv: + timelines: list[Tuple[TenantId, TimelineId]] + neon_env: NeonEnv + pg_bin: PgBin + pageserver_http: PageserverHttpClient + layer_size: int + pgbench_init_lsns: Dict[TenantId, Lsn] + + def timelines_du(self) -> Tuple[int, int, int]: + return poor_mans_du(self.neon_env, [(tid, tlid) for tid, tlid in self.timelines]) + + def du_by_timeline(self) -> Dict[Tuple[TenantId, TimelineId], int]: + return { + (tid, tlid): poor_mans_du(self.neon_env, [(tid, tlid)])[0] + for tid, tlid in self.timelines + } + + def warm_up_tenant(self, tenant_id: TenantId): + """ + Start a read-only compute at the LSN after pgbench -i, and run pgbench -S against it. + This assumes that the tenant is still at the state after pbench -i. + """ + lsn = self.pgbench_init_lsns[tenant_id] + with self.neon_env.postgres.create_start("main", tenant_id=tenant_id, lsn=lsn) as pg: + self.pg_bin.run(["pgbench", "-S", pg.connstr()]) + + def pageserver_start_with_disk_usage_eviction( + self, period, max_usage_pct, min_avail_bytes, mock_behavior + ): + disk_usage_config = { + "period": period, + "max_usage_pct": max_usage_pct, + "min_avail_bytes": min_avail_bytes, + "mock_statvfs": mock_behavior, + } + + enc = toml.TomlEncoder() + + self.neon_env.pageserver.start( + overrides=( + "--pageserver-config-override=disk_usage_based_eviction=" + + enc.dump_inline_table(disk_usage_config).replace("\n", " "), + ), + ) + + def statvfs_called(): + assert self.neon_env.pageserver.log_contains(".*running mocked statvfs.*") + + wait_until(10, 1, statvfs_called) + + +@pytest.fixture +def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> EvictionEnv: + """ + Creates two tenants, one somewhat larger than the other. + """ + + log.info(f"setting up eviction_env for test {request.node.name}") + + neon_env_builder.enable_remote_storage(RemoteStorageKind.LOCAL_FS, f"{request.node.name}") + + env = neon_env_builder.init_start() + pageserver_http = env.pageserver.http_client() + + # allow because we are invoking this manually; we always warn on executing disk based eviction + env.pageserver.allowed_errors.append(r".* running disk usage based eviction due to pressure.*") + + # remove the initial tenant + ## why wait for upload queue? => https://github.com/neondatabase/neon/issues/3865 + assert env.initial_timeline + wait_for_upload_queue_empty(env.pageserver, env.initial_tenant, env.initial_timeline) + pageserver_http.tenant_detach(env.initial_tenant) + assert isinstance(env.remote_storage, LocalFsStorage) + tenant_remote_storage = env.remote_storage.root / "tenants" / str(env.initial_tenant) + assert tenant_remote_storage.is_dir() + shutil.rmtree(tenant_remote_storage) + env.initial_tenant = TenantId("0" * 32) + env.initial_timeline = None + + # Choose small layer_size so that we can use low pgbench_scales and still get a large count of layers. + # Large count of layers and small layer size is good for testing because it makes evictions predictable. + # Predictable in the sense that many layer evictions will be required to reach the eviction target, because + # each eviction only makes small progress. That means little overshoot, and thereby stable asserts. + pgbench_scales = [4, 6] + layer_size = 5 * 1024**2 + + pgbench_init_lsns = {} + + timelines = [] + for scale in pgbench_scales: + tenant_id, timeline_id = env.neon_cli.create_tenant( + conf={ + "gc_period": "0s", + "compaction_period": "0s", + "checkpoint_distance": f"{layer_size}", + "image_creation_threshold": "100", + "compaction_target_size": f"{layer_size}", + } + ) + + with env.postgres.create_start("main", tenant_id=tenant_id) as pg: + pg_bin.run(["pgbench", "-i", f"-s{scale}", pg.connstr()]) + wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id) + + timelines.append((tenant_id, timeline_id)) + + # stop the safekeepers to avoid on-demand downloads caused by + # initial logical size calculation triggered by walreceiver connection status + # when we restart the pageserver process in any of the tests + env.neon_cli.safekeeper_stop() + + # after stopping the safekeepers, we know that no new WAL will be coming in + for tenant_id, timeline_id in timelines: + pageserver_http.timeline_checkpoint(tenant_id, timeline_id) + wait_for_upload_queue_empty(env.pageserver, tenant_id, timeline_id) + tl_info = pageserver_http.timeline_detail(tenant_id, timeline_id) + assert tl_info["last_record_lsn"] == tl_info["disk_consistent_lsn"] + assert tl_info["disk_consistent_lsn"] == tl_info["remote_consistent_lsn"] + pgbench_init_lsns[tenant_id] = Lsn(tl_info["last_record_lsn"]) + + layers = pageserver_http.layer_map_info(tenant_id, timeline_id) + log.info(f"{layers}") + assert ( + len(layers.historic_layers) >= 10 + ), "evictions happen at layer granularity, but we often assert at byte-granularity" + + eviction_env = EvictionEnv( + timelines=timelines, + neon_env=env, + pageserver_http=pageserver_http, + layer_size=layer_size, + pg_bin=pg_bin, + pgbench_init_lsns=pgbench_init_lsns, + ) + + return eviction_env + + +def test_broken_tenants_are_skipped(eviction_env: EvictionEnv): + env = eviction_env + + env.neon_env.pageserver.allowed_errors.append( + r".* Changing Active tenant to Broken state, reason: broken from test" + ) + broken_tenant_id, broken_timeline_id = env.timelines[0] + env.pageserver_http.tenant_break(broken_tenant_id) + + healthy_tenant_id, healthy_timeline_id = env.timelines[1] + + broken_size_pre, _, _ = poor_mans_du(env.neon_env, [(broken_tenant_id, broken_timeline_id)]) + healthy_size_pre, _, _ = poor_mans_du(env.neon_env, [(healthy_tenant_id, healthy_timeline_id)]) + + # try to evict everything, then validate that broken tenant wasn't touched + target = broken_size_pre + healthy_size_pre + + response = env.pageserver_http.disk_usage_eviction_run({"evict_bytes": target}) + log.info(f"{response}") + + broken_size_post, _, _ = poor_mans_du(env.neon_env, [(broken_tenant_id, broken_timeline_id)]) + healthy_size_post, _, _ = poor_mans_du(env.neon_env, [(healthy_tenant_id, healthy_timeline_id)]) + + assert broken_size_pre == broken_size_post, "broken tenant should not be touched" + assert healthy_size_post < healthy_size_pre + assert healthy_size_post == 0 + env.neon_env.pageserver.allowed_errors.append(".*" + GLOBAL_LRU_LOG_LINE) + + +def test_pageserver_evicts_until_pressure_is_relieved(eviction_env: EvictionEnv): + """ + Basic test to ensure that we evict enough to relieve pressure. + """ + env = eviction_env + pageserver_http = env.pageserver_http + + (total_on_disk, _, _) = env.timelines_du() + + target = total_on_disk // 2 + + response = pageserver_http.disk_usage_eviction_run({"evict_bytes": target}) + log.info(f"{response}") + + (later_total_on_disk, _, _) = env.timelines_du() + + actual_change = total_on_disk - later_total_on_disk + + assert 0 <= actual_change, "nothing can load layers during this test" + assert actual_change >= target, "must evict more than half" + assert ( + response["Finished"]["assumed"]["projected_after"]["freed_bytes"] >= actual_change + ), "report accurately evicted bytes" + assert response["Finished"]["assumed"]["failed"]["count"] == 0, "zero failures expected" + + +def test_pageserver_respects_overridden_resident_size(eviction_env: EvictionEnv): + """ + Override tenant min resident and ensure that it will be respected by eviction. + """ + env = eviction_env + ps_http = env.pageserver_http + + (total_on_disk, _, _) = env.timelines_du() + du_by_timeline = env.du_by_timeline() + log.info("du_by_timeline: %s", du_by_timeline) + + assert len(du_by_timeline) == 2, "this test assumes two tenants" + large_tenant = max(du_by_timeline, key=du_by_timeline.__getitem__) + small_tenant = min(du_by_timeline, key=du_by_timeline.__getitem__) + assert du_by_timeline[large_tenant] > du_by_timeline[small_tenant] + assert ( + du_by_timeline[large_tenant] - du_by_timeline[small_tenant] > 5 * env.layer_size + ), "ensure this test will do more than 1 eviction" + + # Give the larger tenant a haircut while preventing the smaller tenant from getting one. + # To prevent the smaller from getting a haircut, we set min_resident_size to its current size. + # To ensure the larger tenant is getting a haircut, any non-zero `target` will do. + min_resident_size = du_by_timeline[small_tenant] + target = 1 + assert ( + du_by_timeline[large_tenant] > min_resident_size + ), "ensure the larger tenant will get a haircut" + ps_http.patch_tenant_config_client_side( + small_tenant[0], {"min_resident_size_override": min_resident_size} + ) + ps_http.patch_tenant_config_client_side( + large_tenant[0], {"min_resident_size_override": min_resident_size} + ) + + # Make the large tenant more-recently used. An incorrect implemention would try to evict + # the smaller tenant completely first, before turning to the larger tenant, + # since the smaller tenant's layers are least-recently-used. + env.warm_up_tenant(large_tenant[0]) + + # do one run + response = ps_http.disk_usage_eviction_run({"evict_bytes": target}) + log.info(f"{response}") + + time.sleep(1) # give log time to flush + assert not env.neon_env.pageserver.log_contains( + GLOBAL_LRU_LOG_LINE, + ), "this test is pointless if it fell back to global LRU" + + (later_total_on_disk, _, _) = env.timelines_du() + later_du_by_timeline = env.du_by_timeline() + log.info("later_du_by_timeline: %s", later_du_by_timeline) + + actual_change = total_on_disk - later_total_on_disk + assert 0 <= actual_change, "nothing can load layers during this test" + assert actual_change >= target, "eviction must always evict more than target" + assert ( + response["Finished"]["assumed"]["projected_after"]["freed_bytes"] >= actual_change + ), "report accurately evicted bytes" + assert response["Finished"]["assumed"]["failed"]["count"] == 0, "zero failures expected" + + assert ( + later_du_by_timeline[small_tenant] == du_by_timeline[small_tenant] + ), "small tenant sees no haircut" + assert ( + later_du_by_timeline[large_tenant] < du_by_timeline[large_tenant] + ), "large tenant gets a haircut" + assert du_by_timeline[large_tenant] - later_du_by_timeline[large_tenant] >= target + + +def test_pageserver_falls_back_to_global_lru(eviction_env: EvictionEnv): + """ + If we can't relieve pressure using tenant_min_resident_size-respecting eviction, + we should continue to evict layers following global LRU. + """ + env = eviction_env + ps_http = env.pageserver_http + + (total_on_disk, _, _) = env.timelines_du() + target = total_on_disk + + response = ps_http.disk_usage_eviction_run({"evict_bytes": target}) + log.info(f"{response}") + + (later_total_on_disk, _, _) = env.timelines_du() + actual_change = total_on_disk - later_total_on_disk + assert 0 <= actual_change, "nothing can load layers during this test" + assert actual_change >= target, "eviction must always evict more than target" + + time.sleep(1) # give log time to flush + assert env.neon_env.pageserver.log_contains(GLOBAL_LRU_LOG_LINE) + env.neon_env.pageserver.allowed_errors.append(".*" + GLOBAL_LRU_LOG_LINE) + + +def test_partial_evict_tenant(eviction_env: EvictionEnv): + """ + Warm up a tenant, then build up pressure to cause in evictions in both. + We expect + * the default min resident size to be respect (largest layer file size) + * the warmed-up tenants layers above min resident size to be evicted after the cold tenant's. + """ + env = eviction_env + ps_http = env.pageserver_http + + (total_on_disk, _, _) = env.timelines_du() + du_by_timeline = env.du_by_timeline() + + # pick any tenant + [our_tenant, other_tenant] = list(du_by_timeline.keys()) + (tenant_id, timeline_id) = our_tenant + + # make our tenant more recently used than the other one + env.warm_up_tenant(tenant_id) + + # Build up enough pressure to require evictions from both tenants, + # but not enough to fall into global LRU. + # So, set target to all occipied space, except 2*env.layer_size per tenant + target = ( + du_by_timeline[other_tenant] + (du_by_timeline[our_tenant] // 2) - 2 * 2 * env.layer_size + ) + response = ps_http.disk_usage_eviction_run({"evict_bytes": target}) + log.info(f"{response}") + + (later_total_on_disk, _, _) = env.timelines_du() + actual_change = total_on_disk - later_total_on_disk + assert 0 <= actual_change, "nothing can load layers during this test" + assert actual_change >= target, "eviction must always evict more than target" + + later_du_by_timeline = env.du_by_timeline() + for tenant, later_tenant_usage in later_du_by_timeline.items(): + assert ( + later_tenant_usage < du_by_timeline[tenant] + ), "all tenants should have lost some layers" + + assert ( + later_du_by_timeline[our_tenant] > 0.5 * du_by_timeline[our_tenant] + ), "our warmed up tenant should be at about half capacity, part 1" + assert ( + # We don't know exactly whether the cold tenant needs 2 or just 1 env.layer_size wiggle room. + # So, check for up to 3 here. + later_du_by_timeline[our_tenant] + < 0.5 * du_by_timeline[our_tenant] + 3 * env.layer_size + ), "our warmed up tenant should be at about half capacity, part 2" + assert ( + later_du_by_timeline[other_tenant] < 2 * env.layer_size + ), "the other tenant should be evicted to is min_resident_size, i.e., max layer file size" + + +def poor_mans_du( + env: NeonEnv, timelines: list[Tuple[TenantId, TimelineId]] +) -> Tuple[int, int, int]: + """ + Disk usage, largest, smallest layer for layer files over the given (tenant, timeline) tuples; + this could be done over layers endpoint just as well. + """ + total_on_disk = 0 + largest_layer = 0 + smallest_layer = None + for tenant_id, timeline_id in timelines: + dir = Path(env.repo_dir) / "tenants" / str(tenant_id) / "timelines" / str(timeline_id) + assert dir.exists(), f"timeline dir does not exist: {dir}" + sum = 0 + for file in dir.iterdir(): + if "__" not in file.name: + continue + size = file.stat().st_size + sum += size + largest_layer = max(largest_layer, size) + if smallest_layer: + smallest_layer = min(smallest_layer, size) + else: + smallest_layer = size + log.info(f"{tenant_id}/{timeline_id} => {file.name} {size}") + + log.info(f"{tenant_id}/{timeline_id}: sum {sum}") + total_on_disk += sum + + assert smallest_layer is not None or total_on_disk == 0 and largest_layer == 0 + return (total_on_disk, largest_layer, smallest_layer or 0) + + +def test_statvfs_error_handling(eviction_env: EvictionEnv): + """ + We should log an error that statvfs fails. + """ + env = eviction_env + env.neon_env.pageserver.stop() + env.pageserver_start_with_disk_usage_eviction( + period="1s", + max_usage_pct=90, + min_avail_bytes=0, + mock_behavior={ + "type": "Failure", + "mocked_error": "EIO", + }, + ) + + assert env.neon_env.pageserver.log_contains(".*statvfs failed.*EIO") + env.neon_env.pageserver.allowed_errors.append(".*statvfs failed.*EIO") + + +def test_statvfs_pressure_usage(eviction_env: EvictionEnv): + """ + If statvfs data shows 100% usage, the eviction task will drive it down to + the configured max_usage_pct. + """ + env = eviction_env + + env.neon_env.pageserver.stop() + + # make it seem like we're at 100% utilization by setting total bytes to the used bytes + total_size, _, _ = env.timelines_du() + blocksize = 512 + total_blocks = (total_size + (blocksize - 1)) // blocksize + + env.pageserver_start_with_disk_usage_eviction( + period="1s", + max_usage_pct=33, + min_avail_bytes=0, + mock_behavior={ + "type": "Success", + "blocksize": blocksize, + "total_blocks": total_blocks, + # Only count layer files towards used bytes in the mock_statvfs. + # This avoids accounting for metadata files & tenant conf in the tests. + "name_filter": ".*__.*", + }, + ) + + def relieved_log_message(): + assert env.neon_env.pageserver.log_contains(".*disk usage pressure relieved") + + wait_until(10, 1, relieved_log_message) + + post_eviction_total_size, _, _ = env.timelines_du() + + assert post_eviction_total_size <= 0.33 * total_size, "we requested max 33% usage" + + +def test_statvfs_pressure_min_avail_bytes(eviction_env: EvictionEnv): + """ + If statvfs data shows 100% usage, the eviction task will drive it down to + at least the configured min_avail_bytes. + """ + env = eviction_env + + env.neon_env.pageserver.stop() + + # make it seem like we're at 100% utilization by setting total bytes to the used bytes + total_size, _, _ = env.timelines_du() + blocksize = 512 + total_blocks = (total_size + (blocksize - 1)) // blocksize + + min_avail_bytes = total_size // 3 + + env.pageserver_start_with_disk_usage_eviction( + period="1s", + max_usage_pct=100, + min_avail_bytes=min_avail_bytes, + mock_behavior={ + "type": "Success", + "blocksize": blocksize, + "total_blocks": total_blocks, + # Only count layer files towards used bytes in the mock_statvfs. + # This avoids accounting for metadata files & tenant conf in the tests. + "name_filter": ".*__.*", + }, + ) + + def relieved_log_message(): + assert env.neon_env.pageserver.log_contains(".*disk usage pressure relieved") + + wait_until(10, 1, relieved_log_message) + + post_eviction_total_size, _, _ = env.timelines_du() + + assert ( + total_size - post_eviction_total_size >= min_avail_bytes + ), "we requested at least min_avail_bytes worth of free space"