diff --git a/Cargo.lock b/Cargo.lock index 3474211ac6..e5f39658a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6204,6 +6204,7 @@ dependencies = [ "itertools 0.10.5", "jsonwebtoken", "metrics", + "nix 0.30.1", "once_cell", "pageserver_api", "parking_lot 0.12.1", diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index 539e931983..56822b5c25 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -72,6 +72,7 @@ http-utils.workspace = true utils.workspace = true wal_decoder.workspace = true env_logger.workspace = true +nix.workspace = true workspace_hack.workspace = true diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 79cf2f9149..2ec541b6f0 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -17,8 +17,9 @@ use http_utils::tls_certs::ReloadingCertificateResolver; use metrics::set_build_info_metric; use remote_storage::RemoteStorageConfig; use safekeeper::defaults::{ - DEFAULT_CONTROL_FILE_SAVE_INTERVAL, DEFAULT_EVICTION_MIN_RESIDENT, DEFAULT_HEARTBEAT_TIMEOUT, - DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES, + DEFAULT_CONTROL_FILE_SAVE_INTERVAL, DEFAULT_EVICTION_MIN_RESIDENT, + DEFAULT_GLOBAL_DISK_CHECK_INTERVAL, DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, + DEFAULT_MAX_GLOBAL_DISK_USAGE_RATIO, DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_MAX_REELECT_OFFLOADER_LAG_BYTES, DEFAULT_MAX_TIMELINE_DISK_USAGE_BYTES, DEFAULT_PARTIAL_BACKUP_CONCURRENCY, DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR, DEFAULT_SSL_CERT_FILE, DEFAULT_SSL_CERT_RELOAD_PERIOD, DEFAULT_SSL_KEY_FILE, @@ -42,6 +43,12 @@ use utils::metrics_collector::{METRICS_COLLECTION_INTERVAL, METRICS_COLLECTOR}; use utils::sentry_init::init_sentry; use utils::{pid_file, project_build_tag, project_git_version, tcp_listener}; +use safekeeper::hadron::{ + GLOBAL_DISK_LIMIT_EXCEEDED, get_filesystem_capacity, get_filesystem_usage, +}; +use safekeeper::metrics::GLOBAL_DISK_UTIL_CHECK_SECONDS; +use std::sync::atomic::Ordering; + #[global_allocator] static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; @@ -256,6 +263,15 @@ struct Args { /* BEGIN_HADRON */ #[arg(long)] enable_pull_timeline_on_startup: bool, + /// How often to scan entire data-dir for total disk usage + #[arg(long, value_parser=humantime::parse_duration, default_value = DEFAULT_GLOBAL_DISK_CHECK_INTERVAL)] + global_disk_check_interval: Duration, + /// The portion of the filesystem capacity that can be used by all timelines. + /// A circuit breaker will trip and reject all WAL writes if the total usage + /// exceeds this ratio. + /// Set to 0 to disable the global disk usage limit. + #[arg(long, default_value_t = DEFAULT_MAX_GLOBAL_DISK_USAGE_RATIO)] + max_global_disk_usage_ratio: f64, /* END_HADRON */ } @@ -444,6 +460,8 @@ async fn main() -> anyhow::Result<()> { advertise_pg_addr_tenant_only: None, enable_pull_timeline_on_startup: args.enable_pull_timeline_on_startup, hcc_base_url: None, + global_disk_check_interval: args.global_disk_check_interval, + max_global_disk_usage_ratio: args.max_global_disk_usage_ratio, /* END_HADRON */ }); @@ -618,6 +636,49 @@ async fn start_safekeeper(conf: Arc) -> Result<()> { .map(|res| ("Timeline map housekeeping".to_owned(), res)); tasks_handles.push(Box::pin(timeline_housekeeping_handle)); + /* BEGIN_HADRON */ + // Spawn global disk usage watcher task, if a global disk usage limit is specified. + let interval = conf.global_disk_check_interval; + let data_dir = conf.workdir.clone(); + // Use the safekeeper data directory to compute filesystem capacity. This only runs once on startup, so + // there is little point to continue if we can't have the proper protections in place. + let fs_capacity_bytes = get_filesystem_capacity(data_dir.as_std_path()) + .expect("Failed to get filesystem capacity for data directory"); + let limit: u64 = (conf.max_global_disk_usage_ratio * fs_capacity_bytes as f64) as u64; + if limit > 0 { + let disk_usage_watch_handle = BACKGROUND_RUNTIME + .handle() + .spawn(async move { + // Use Tokio interval to preserve fixed cadence between filesystem utilization checks + let mut ticker = tokio::time::interval(interval); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + + loop { + ticker.tick().await; + let data_dir_clone = data_dir.clone(); + let check_start = Instant::now(); + + let usage = tokio::task::spawn_blocking(move || { + get_filesystem_usage(data_dir_clone.as_std_path()) + }) + .await + .unwrap_or(0); + + let elapsed = check_start.elapsed().as_secs_f64(); + GLOBAL_DISK_UTIL_CHECK_SECONDS.observe(elapsed); + if usage > limit { + warn!( + "Global disk usage exceeded limit. Usage: {} bytes, limit: {} bytes", + usage, limit + ); + } + GLOBAL_DISK_LIMIT_EXCEEDED.store(usage > limit, Ordering::Relaxed); + } + }) + .map(|res| ("Global disk usage watcher".to_string(), res)); + tasks_handles.push(Box::pin(disk_usage_watch_handle)); + } + /* END_HADRON */ if let Some(pg_listener_tenant_only) = pg_listener_tenant_only { let wal_service_handle = current_thread_rt .as_ref() diff --git a/safekeeper/src/hadron.rs b/safekeeper/src/hadron.rs index b41bf2c3da..8c6a912166 100644 --- a/safekeeper/src/hadron.rs +++ b/safekeeper/src/hadron.rs @@ -1,12 +1,17 @@ +use once_cell::sync::Lazy; use pem::Pem; use safekeeper_api::models::PullTimelineRequest; -use std::{collections::HashMap, env::VarError, net::IpAddr, sync::Arc, time::Duration}; +use std::{ + collections::HashMap, env::VarError, net::IpAddr, sync::Arc, sync::atomic::AtomicBool, + time::Duration, +}; use tokio::time::sleep; use tokio_util::sync::CancellationToken; use url::Url; -use utils::{backoff, id::TenantTimelineId, ip_address}; +use utils::{backoff, critical_timeline, id::TenantTimelineId, ip_address}; + +use anyhow::{Result, anyhow}; -use anyhow::Result; use pageserver_api::controller_api::{ AvailabilityZone, NodeRegisterRequest, SafekeeperTimeline, SafekeeperTimelinesResponse, }; @@ -346,6 +351,70 @@ pub async fn hcc_pull_timelines( Ok(()) } +/// true if the last background scan found total usage > limit +pub static GLOBAL_DISK_LIMIT_EXCEEDED: Lazy = Lazy::new(|| AtomicBool::new(false)); + +/// Returns filesystem usage in bytes for the filesystem containing the given path. +// Need to suppress the clippy::unnecessary_cast warning because the casts on the block count and the +// block size are required on macOS (they are 32-bit integers on macOS, apparantly). +#[allow(clippy::unnecessary_cast)] +pub fn get_filesystem_usage(path: &std::path::Path) -> u64 { + // Allow overriding disk usage via failpoint for tests + fail::fail_point!("sk-global-disk-usage", |val| { + // val is Option; parse payload if present + val.and_then(|s| s.parse::().ok()).unwrap_or(0) + }); + + // Call statvfs(3) for filesystem usage + use nix::sys::statvfs::statvfs; + match statvfs(path) { + Ok(stat) => { + // fragment size (f_frsize) if non-zero else block size (f_bsize) + let frsize = stat.fragment_size(); + let blocksz = if frsize > 0 { + frsize + } else { + stat.block_size() + }; + // used blocks = total blocks - available blocks for unprivileged + let used_blocks = stat.blocks().saturating_sub(stat.blocks_available()); + used_blocks as u64 * blocksz as u64 + } + Err(e) => { + // The global disk usage watcher aren't associated with a tenant or timeline, so we just + // pass placeholder (all-zero) tenant and timeline IDs to the critical!() macro. + let placeholder_ttid = TenantTimelineId::empty(); + critical_timeline!( + placeholder_ttid.tenant_id, + placeholder_ttid.timeline_id, + "Global disk usage watcher failed to read filesystem usage: {:?}", + e + ); + 0 + } + } +} + +/// Returns the total capacity of the current working directory's filesystem in bytes. +#[allow(clippy::unnecessary_cast)] +pub fn get_filesystem_capacity(path: &std::path::Path) -> Result { + // Call statvfs(3) for filesystem stats + use nix::sys::statvfs::statvfs; + match statvfs(path) { + Ok(stat) => { + // fragment size (f_frsize) if non-zero else block size (f_bsize) + let frsize = stat.fragment_size(); + let blocksz = if frsize > 0 { + frsize + } else { + stat.block_size() + }; + Ok(stat.blocks() as u64 * blocksz as u64) + } + Err(e) => Err(anyhow!("Failed to read filesystem capacity: {:?}", e)), + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index a0ee2facb5..c9d8e7d3b0 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -33,11 +33,13 @@ use utils::id::{TenantId, TenantTimelineId, TimelineId}; use utils::lsn::Lsn; use crate::debug_dump::TimelineDigestRequest; +use crate::hadron::{get_filesystem_capacity, get_filesystem_usage}; use crate::safekeeper::TermLsn; use crate::timelines_global_map::DeleteOrExclude; use crate::{ GlobalTimelines, SafeKeeperConf, copy_timeline, debug_dump, patch_control_file, pull_timeline, }; +use serde_json::json; /// Healthcheck handler. async fn status_handler(request: Request) -> Result, ApiError> { @@ -127,6 +129,21 @@ async fn utilization_handler(request: Request) -> Result, A json_response(StatusCode::OK, utilization) } +/// Returns filesystem capacity and current utilization for the safekeeper data directory. +async fn filesystem_usage_handler(request: Request) -> Result, ApiError> { + check_permission(&request, None)?; + let conf = get_conf(&request); + let path = conf.workdir.as_std_path(); + let capacity = get_filesystem_capacity(path).map_err(ApiError::InternalServerError)?; + let usage = get_filesystem_usage(path); + let resp = json!({ + "data_dir": path, + "capacity_bytes": capacity, + "usage_bytes": usage, + }); + json_response(StatusCode::OK, resp) +} + /// List all (not deleted) timelines. /// Note: it is possible to do the same with debug_dump. async fn timeline_list_handler(request: Request) -> Result, ApiError> { @@ -730,6 +747,11 @@ pub fn make_router( }) }) .get("/v1/utilization", |r| request_span(r, utilization_handler)) + /* BEGIN_HADRON */ + .get("/v1/debug/filesystem_usage", |r| { + request_span(r, filesystem_usage_handler) + }) + /* END_HADRON */ .delete("/v1/tenant/:tenant_id", |r| { request_span(r, tenant_delete_handler) }) diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 02533b804d..c6f9cc29e5 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -50,6 +50,7 @@ pub mod wal_storage; pub mod test_utils; mod timelines_global_map; + use std::sync::Arc; pub use timelines_global_map::GlobalTimelines; @@ -83,6 +84,10 @@ pub mod defaults { pub const DEFAULT_SSL_KEY_FILE: &str = "server.key"; pub const DEFAULT_SSL_CERT_FILE: &str = "server.crt"; pub const DEFAULT_SSL_CERT_RELOAD_PERIOD: &str = "60s"; + + // Global disk watcher defaults + pub const DEFAULT_GLOBAL_DISK_CHECK_INTERVAL: &str = "60s"; + pub const DEFAULT_MAX_GLOBAL_DISK_USAGE_RATIO: f64 = 0.0; } #[derive(Debug, Clone)] @@ -116,6 +121,10 @@ pub struct SafeKeeperConf { /* BEGIN_HADRON */ pub max_reelect_offloader_lag_bytes: u64, pub max_timeline_disk_usage_bytes: u64, + /// How often to check the working directory's filesystem for total disk usage. + pub global_disk_check_interval: Duration, + /// The portion of the filesystem capacity that can be used by all timelines. + pub max_global_disk_usage_ratio: f64, /* END_HADRON */ pub backup_parallel_jobs: usize, pub wal_backup_enabled: bool, @@ -173,6 +182,8 @@ impl SafeKeeperConf { /* BEGIN_HADRON */ max_reelect_offloader_lag_bytes: defaults::DEFAULT_MAX_REELECT_OFFLOADER_LAG_BYTES, max_timeline_disk_usage_bytes: defaults::DEFAULT_MAX_TIMELINE_DISK_USAGE_BYTES, + global_disk_check_interval: Duration::from_secs(60), + max_global_disk_usage_ratio: defaults::DEFAULT_MAX_GLOBAL_DISK_USAGE_RATIO, /* END_HADRON */ current_thread_runtime: false, walsenders_keep_horizon: false, @@ -235,10 +246,13 @@ pub static WAL_BACKUP_RUNTIME: Lazy = Lazy::new(|| { .expect("Failed to create WAL backup runtime") }); +/// Hadron: Dedicated runtime for infrequent background tasks. pub static BACKGROUND_RUNTIME: Lazy = Lazy::new(|| { tokio::runtime::Builder::new_multi_thread() - .thread_name("background worker") - .worker_threads(1) // there is only one task now (ssl certificate reloading), having more threads doesn't make sense + .thread_name("Hadron background worker") + // One worker thread is enough, as most of the actual tasks run on blocking threads + // which has it own thread pool. + .worker_threads(1) .enable_all() .build() .expect("Failed to create background runtime") diff --git a/safekeeper/src/metrics.rs b/safekeeper/src/metrics.rs index e1af51c115..b07852aaee 100644 --- a/safekeeper/src/metrics.rs +++ b/safekeeper/src/metrics.rs @@ -963,3 +963,17 @@ async fn collect_timeline_metrics(global_timelines: Arc) -> Vec } res } + +/* BEGIN_HADRON */ +// Metrics reporting the time spent to perform each safekeeper filesystem utilization check. +pub static GLOBAL_DISK_UTIL_CHECK_SECONDS: Lazy = Lazy::new(|| { + // Buckets from 1ms up to 10s + let buckets = vec![0.001, 0.01, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0]; + register_histogram!( + "safekeeper_global_disk_utilization_check_seconds", + "Seconds spent to perform each safekeeper filesystem utilization check", + buckets + ) + .expect("Failed to register safekeeper_global_disk_utilization_check_seconds histogram") +}); +/* END_HADRON */ diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index dbe510a019..a1a0aab9fd 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -29,6 +29,8 @@ use utils::sync::gate::Gate; use crate::metrics::{ FullTimelineInfo, MISC_OPERATION_SECONDS, WAL_STORAGE_LIMIT_ERRORS, WalStorageMetrics, }; + +use crate::hadron::GLOBAL_DISK_LIMIT_EXCEEDED; use crate::rate_limit::RateLimiter; use crate::receive_wal::WalReceivers; use crate::safekeeper::{AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, TermLsn}; @@ -1081,6 +1083,11 @@ impl WalResidentTimeline { ); } } + + if GLOBAL_DISK_LIMIT_EXCEEDED.load(Ordering::Relaxed) { + bail!("Global disk usage exceeded limit"); + } + Ok(()) } // END HADRON diff --git a/safekeeper/tests/walproposer_sim/safekeeper.rs b/safekeeper/tests/walproposer_sim/safekeeper.rs index 393df6228e..30d3ab1a87 100644 --- a/safekeeper/tests/walproposer_sim/safekeeper.rs +++ b/safekeeper/tests/walproposer_sim/safekeeper.rs @@ -195,6 +195,8 @@ pub fn run_server(os: NodeOs, disk: Arc) -> Result<()> { enable_pull_timeline_on_startup: false, advertise_pg_addr_tenant_only: None, hcc_base_url: None, + global_disk_check_interval: Duration::from_secs(10), + max_global_disk_usage_ratio: 0.0, /* END_HADRON */ }; diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 22e6d2e1c3..c691087259 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -2788,7 +2788,8 @@ def test_timeline_disk_usage_limit(neon_env_builder: NeonEnvBuilder): # Wait for the error message to appear in the compute log def error_logged(): - return endpoint.log_contains("WAL storage utilization exceeds configured limit") is not None + if endpoint.log_contains("WAL storage utilization exceeds configured limit") is None: + raise Exception("Expected error message not found in compute log yet") wait_until(error_logged) log.info("Found expected error message in compute log, resuming.") @@ -2822,3 +2823,87 @@ def test_timeline_disk_usage_limit(neon_env_builder: NeonEnvBuilder): cur.execute("select count(*) from t") # 2000 rows from first insert + 1000 from last insert assert cur.fetchone() == (3000,) + + +def test_global_disk_usage_limit(neon_env_builder: NeonEnvBuilder): + """ + Similar to `test_timeline_disk_usage_limit`, but test that the global disk usage circuit breaker + also works as expected. The test scenario: + 1. Create a timeline and endpoint. + 2. Mock high disk usage via failpoint + 3. Write data to the timeline so that disk usage exceeds the limit. + 4. Verify that the writes hang and the expected error message appears in the compute log. + 5. Mock low disk usage via failpoint + 6. Verify that the hanging writes unblock and we can continue to write as normal. + """ + neon_env_builder.num_safekeepers = 1 + remote_storage_kind = s3_storage() + neon_env_builder.enable_safekeeper_remote_storage(remote_storage_kind) + + env = neon_env_builder.init_start() + + env.create_branch("test_global_disk_usage_limit") + endpoint = env.endpoints.create_start("test_global_disk_usage_limit") + + with closing(endpoint.connect()) as conn: + with conn.cursor() as cur: + cur.execute("create table t2(key int, value text)") + + for sk in env.safekeepers: + sk.stop().start( + extra_opts=["--global-disk-check-interval=1s", "--max-global-disk-usage-ratio=0.8"] + ) + + # Set the failpoint to have the disk usage check return u64::MAX, which definitely exceeds the practical + # limits in the test environment. + for sk in env.safekeepers: + sk.http_client().configure_failpoints( + [("sk-global-disk-usage", "return(18446744073709551615)")] + ) + + # Wait until the global disk usage limit watcher trips the circuit breaker. + def error_logged_in_sk(): + for sk in env.safekeepers: + if sk.log_contains("Global disk usage exceeded limit") is None: + raise Exception("Expected error message not found in safekeeper log yet") + + wait_until(error_logged_in_sk) + + def run_hanging_insert_global(): + with closing(endpoint.connect()) as bg_conn: + with bg_conn.cursor() as bg_cur: + # This should generate more than 1KiB of WAL + bg_cur.execute("insert into t2 select generate_series(1,2000), 'payload'") + + bg_thread_global = threading.Thread(target=run_hanging_insert_global) + bg_thread_global.start() + + def error_logged_in_compute(): + if endpoint.log_contains("Global disk usage exceeded limit") is None: + raise Exception("Expected error message not found in compute log yet") + + wait_until(error_logged_in_compute) + log.info("Found the expected error message in compute log, resuming.") + + time.sleep(2) + assert bg_thread_global.is_alive(), "Global hanging insert unblocked prematurely!" + + # Make the disk usage check always return 0 through the failpoint to simulate the disk pressure easing. + # The SKs should resume accepting WAL writes without restarting. + for sk in env.safekeepers: + sk.http_client().configure_failpoints([("sk-global-disk-usage", "return(0)")]) + + bg_thread_global.join(timeout=120) + assert not bg_thread_global.is_alive(), "Hanging global insert did not complete after restart" + log.info("Global hanging insert unblocked.") + + # Verify that we can continue to write as normal and we don't have obvious data corruption + # following the recovery. + with closing(endpoint.connect()) as conn: + with conn.cursor() as cur: + cur.execute("insert into t2 select generate_series(2001,3000), 'payload'") + + with closing(endpoint.connect()) as conn: + with conn.cursor() as cur: + cur.execute("select count(*) from t2") + assert cur.fetchone() == (3000,)