diff --git a/Cargo.lock b/Cargo.lock index 2885f66905..9a367effbb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -233,7 +233,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.32", ] [[package]] @@ -244,7 +244,7 @@ checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842" dependencies = [ "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.32", ] [[package]] @@ -881,7 +881,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.28", + "syn 2.0.32", "which", ] @@ -1095,7 +1095,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.32", ] [[package]] @@ -1486,7 +1486,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.28", + "syn 2.0.32", ] [[package]] @@ -1497,7 +1497,7 @@ checksum = "29a358ff9f12ec09c3e61fef9b5a9902623a695a46a917b07f269bff1445611a" dependencies = [ "darling_core", "quote", - "syn 2.0.28", + "syn 2.0.32", ] [[package]] @@ -1572,7 +1572,7 @@ checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.32", ] [[package]] @@ -1666,7 +1666,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.32", ] [[package]] @@ -1920,7 +1920,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.32", ] [[package]] @@ -2906,7 +2906,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.32", ] [[package]] @@ -3359,7 +3359,7 @@ checksum = "39407670928234ebc5e6e580247dd567ad73a3578460c5990f9503df207e8f07" dependencies = [ "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.32", ] [[package]] @@ -3566,7 +3566,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b69d39aab54d069e7f2fe8cb970493e7834601ca2d8c65fd7bbd183578080d1" dependencies = [ "proc-macro2", - "syn 2.0.28", + "syn 2.0.32", ] [[package]] @@ -4174,7 +4174,7 @@ dependencies = [ "regex", "relative-path", "rustc_version", - "syn 2.0.28", + "syn 2.0.32", "unicode-ident", ] @@ -4320,6 +4320,7 @@ dependencies = [ "histogram", "itertools", "pageserver", + "pageserver_api", "rand 0.8.5", "remote_storage", "reqwest", @@ -4608,7 +4609,7 @@ checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816" dependencies = [ "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.32", ] [[package]] @@ -4689,7 +4690,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.32", ] [[package]] @@ -4956,9 +4957,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.28" +version = "2.0.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04361975b3f5e348b2189d8dc55bc942f278b2d482a6a0365de5bdd62d351567" +checksum = "239814284fd6f1a4ffe4ca893952cdd93c224b6a1571c9a9eadd670295c0c9e2" dependencies = [ "proc-macro2", "quote", @@ -5088,7 +5089,7 @@ checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.32", ] [[package]] @@ -5206,7 +5207,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.32", ] [[package]] @@ -5507,7 +5508,7 @@ checksum = "0f57e3ca2a01450b1a921183a9c9cbfda207fd822cef4ccb00a65402cbba7a74" dependencies = [ "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.32", ] [[package]] @@ -5793,6 +5794,7 @@ dependencies = [ "serde", "serde_assert", "serde_json", + "serde_path_to_error", "serde_with", "signal-hook", "strum", @@ -5951,7 +5953,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.32", "wasm-bindgen-shared", ] @@ -5985,7 +5987,7 @@ checksum = "e128beba882dd1eb6200e1dc92ae6c5dbaa4311aa7bb211ca035779e5efc39f8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.32", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -6322,7 +6324,7 @@ dependencies = [ "smallvec", "subtle", "syn 1.0.109", - "syn 2.0.28", + "syn 2.0.32", "time", "time-macros", "tokio", @@ -6384,22 +6386,22 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.7.3" +version = "0.7.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a7af71d8643341260a65f89fa60c0eeaa907f34544d8f6d9b0df72f069b5e74" +checksum = "1c4061bedbb353041c12f413700357bec76df2c7e2ca8e4df8bac24c6bf68e3d" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.7.3" +version = "0.7.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9731702e2f0617ad526794ae28fbc6f6ca8849b5ba729666c2a5bc4b6ddee2cd" +checksum = "b3c129550b3e6de3fd0ba67ba5c81818f9805e58b8d7fee80a3a59d2c9fc601a" dependencies = [ "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.32", ] [[package]] diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index ccf6f4f2d7..af0414daa2 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -50,6 +50,8 @@ const_format.workspace = true # why is it only here? no other crate should use it, streams are rarely needed. tokio-stream = { version = "0.1.14" } +serde_path_to_error.workspace = true + [dev-dependencies] byteorder.workspace = true bytes.workspace = true diff --git a/libs/utils/src/http/json.rs b/libs/utils/src/http/json.rs index 70e682cb76..7ca62561fe 100644 --- a/libs/utils/src/http/json.rs +++ b/libs/utils/src/http/json.rs @@ -25,8 +25,12 @@ pub async fn json_request_or_empty_body Deserialize<'de>>( if body.remaining() == 0 { return Ok(None); } - serde_json::from_reader(body.reader()) - .context("Failed to parse json request") + + let mut deser = serde_json::de::Deserializer::from_reader(body.reader()); + + serde_path_to_error::deserialize(&mut deser) + // intentionally stringify because the debug version is not helpful in python logs + .map_err(|e| anyhow::anyhow!("Failed to parse json request: {e}")) .map(Some) .map_err(ApiError::BadRequest) } diff --git a/libs/utils/src/timeout.rs b/libs/utils/src/timeout.rs index 11fa417242..56bf57a900 100644 --- a/libs/utils/src/timeout.rs +++ b/libs/utils/src/timeout.rs @@ -2,8 +2,11 @@ use std::time::Duration; use tokio_util::sync::CancellationToken; +#[derive(thiserror::Error, Debug)] pub enum TimeoutCancellableError { + #[error("Timed out")] Timeout, + #[error("Cancelled")] Cancelled, } diff --git a/libs/walproposer/build.rs b/libs/walproposer/build.rs index d32c8ab299..fd09030dbd 100644 --- a/libs/walproposer/build.rs +++ b/libs/walproposer/build.rs @@ -1,3 +1,6 @@ +//! Links with walproposer, pgcommon, pgport and runs bindgen on walproposer.h +//! to generate Rust bindings for it. + use std::{env, path::PathBuf, process::Command}; use anyhow::{anyhow, Context}; diff --git a/libs/walproposer/src/api_bindings.rs b/libs/walproposer/src/api_bindings.rs index 7f1bbc3b80..77afe1e686 100644 --- a/libs/walproposer/src/api_bindings.rs +++ b/libs/walproposer/src/api_bindings.rs @@ -1,3 +1,6 @@ +//! A C-Rust shim: defines implementation of C walproposer API, assuming wp +//! callback_data stores Box to some Rust implementation. + #![allow(dead_code)] use std::ffi::CStr; diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index cd99cda783..bd63c4d860 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -41,6 +41,8 @@ use crate::{ TIMELINE_DELETE_MARK_SUFFIX, TIMELINE_UNINIT_MARK_SUFFIX, }; +use self::defaults::DEFAULT_CONCURRENT_TENANT_WARMUP; + pub mod defaults { use crate::tenant::config::defaults::*; use const_format::formatcp; @@ -61,6 +63,8 @@ pub mod defaults { pub const DEFAULT_LOG_FORMAT: &str = "plain"; + pub const DEFAULT_CONCURRENT_TENANT_WARMUP: usize = 8; + pub const DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES: usize = super::ConfigurableSemaphore::DEFAULT_INITIAL.get(); @@ -94,6 +98,7 @@ pub mod defaults { #log_format = '{DEFAULT_LOG_FORMAT}' #concurrent_tenant_size_logical_size_queries = '{DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES}' +#concurrent_tenant_warmup = '{DEFAULT_CONCURRENT_TENANT_WARMUP}' #metric_collection_interval = '{DEFAULT_METRIC_COLLECTION_INTERVAL}' #cached_metric_collection_interval = '{DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL}' @@ -180,6 +185,11 @@ pub struct PageServerConf { pub log_format: LogFormat, + /// Number of tenants which will be concurrently loaded from remote storage proactively on startup, + /// does not limit tenants loaded in response to client I/O. A lower value implicitly deprioritizes + /// loading such tenants, vs. other work in the system. + pub concurrent_tenant_warmup: ConfigurableSemaphore, + /// Number of concurrent [`Tenant::gather_size_inputs`](crate::tenant::Tenant::gather_size_inputs) allowed. pub concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore, /// Limit of concurrent [`Tenant::gather_size_inputs`] issued by module `eviction_task`. @@ -283,6 +293,7 @@ struct PageServerConfigBuilder { log_format: BuilderValue, + concurrent_tenant_warmup: BuilderValue, concurrent_tenant_size_logical_size_queries: BuilderValue, metric_collection_interval: BuilderValue, @@ -340,6 +351,8 @@ impl Default for PageServerConfigBuilder { .expect("cannot parse default keepalive interval")), log_format: Set(LogFormat::from_str(DEFAULT_LOG_FORMAT).unwrap()), + concurrent_tenant_warmup: Set(NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP) + .expect("Invalid default constant")), concurrent_tenant_size_logical_size_queries: Set( ConfigurableSemaphore::DEFAULT_INITIAL, ), @@ -453,6 +466,10 @@ impl PageServerConfigBuilder { self.log_format = BuilderValue::Set(log_format) } + pub fn concurrent_tenant_warmup(&mut self, u: NonZeroUsize) { + self.concurrent_tenant_warmup = BuilderValue::Set(u); + } + pub fn concurrent_tenant_size_logical_size_queries(&mut self, u: NonZeroUsize) { self.concurrent_tenant_size_logical_size_queries = BuilderValue::Set(u); } @@ -518,6 +535,9 @@ impl PageServerConfigBuilder { } pub fn build(self) -> anyhow::Result { + let concurrent_tenant_warmup = self + .concurrent_tenant_warmup + .ok_or(anyhow!("missing concurrent_tenant_warmup"))?; let concurrent_tenant_size_logical_size_queries = self .concurrent_tenant_size_logical_size_queries .ok_or(anyhow!( @@ -570,6 +590,7 @@ impl PageServerConfigBuilder { .broker_keepalive_interval .ok_or(anyhow!("No broker keepalive interval provided"))?, log_format: self.log_format.ok_or(anyhow!("missing log_format"))?, + concurrent_tenant_warmup: ConfigurableSemaphore::new(concurrent_tenant_warmup), concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::new( concurrent_tenant_size_logical_size_queries, ), @@ -807,6 +828,11 @@ impl PageServerConf { "log_format" => builder.log_format( LogFormat::from_config(&parse_toml_string(key, item)?)? ), + "concurrent_tenant_warmup" => builder.concurrent_tenant_warmup({ + let input = parse_toml_string(key, item)?; + let permits = input.parse::().context("expected a number of initial permits, not {s:?}")?; + NonZeroUsize::new(permits).context("initial semaphore permits out of range: 0, use other configuration to disable a feature")? + }), "concurrent_tenant_size_logical_size_queries" => builder.concurrent_tenant_size_logical_size_queries({ let input = parse_toml_string(key, item)?; let permits = input.parse::().context("expected a number of initial permits, not {s:?}")?; @@ -904,6 +930,10 @@ impl PageServerConf { broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(), broker_keepalive_interval: Duration::from_secs(5000), log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(), + concurrent_tenant_warmup: ConfigurableSemaphore::new( + NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP) + .expect("Invalid default constant"), + ), concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(), eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore::default( ), @@ -1122,6 +1152,9 @@ background_task_maximum_delay = '334 s' storage_broker::DEFAULT_KEEPALIVE_INTERVAL )?, log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(), + concurrent_tenant_warmup: ConfigurableSemaphore::new( + NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP).unwrap() + ), concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(), eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore::default(), @@ -1188,6 +1221,9 @@ background_task_maximum_delay = '334 s' broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(), broker_keepalive_interval: Duration::from_secs(5), log_format: LogFormat::Json, + concurrent_tenant_warmup: ConfigurableSemaphore::new( + NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP).unwrap() + ), concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(), eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore::default(), diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index a7d1e79614..601fad5bde 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -34,6 +34,7 @@ use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL}; use crate::pgdatadir_mapping::LsnForTimestamp; use crate::task_mgr::TaskKind; use crate::tenant::config::{LocationConf, TenantConfOpt}; +use crate::tenant::mgr::GetActiveTenantError; use crate::tenant::mgr::{ GetTenantError, SetNewTenantConfigError, TenantManager, TenantMapError, TenantMapInsertError, TenantSlotError, TenantSlotUpsertError, TenantStateError, @@ -67,6 +68,11 @@ use utils::{ // Imports only used for testing APIs use pageserver_api::models::ConfigureFailpointsRequest; +// For APIs that require an Active tenant, how long should we block waiting for that state? +// This is not functionally necessary (clients will retry), but avoids generating a lot of +// failed API calls while tenants are activating. +const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(5000); + pub struct State { conf: &'static PageServerConf, tenant_manager: Arc, @@ -233,6 +239,19 @@ impl From for ApiError { } } +impl From for ApiError { + fn from(e: GetActiveTenantError) -> ApiError { + match e { + GetActiveTenantError::WillNotBecomeActive(_) => ApiError::Conflict(format!("{}", e)), + GetActiveTenantError::Cancelled => ApiError::ShuttingDown, + GetActiveTenantError::NotFound(gte) => gte.into(), + GetActiveTenantError::WaitForActiveTimeout { .. } => { + ApiError::ResourceUnavailable(format!("{}", e).into()) + } + } + } +} + impl From for ApiError { fn from(e: SetNewTenantConfigError) -> ApiError { match e { @@ -435,7 +454,10 @@ async fn timeline_create_handler( let state = get_state(&request); async { - let tenant = state.tenant_manager.get_attached_tenant_shard(tenant_shard_id, true)?; + let tenant = state.tenant_manager.get_attached_tenant_shard(tenant_shard_id, false)?; + + tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?; + match tenant.create_timeline( new_timeline_id, request_data.ancestor_timeline_id.map(TimelineId::from), @@ -694,11 +716,23 @@ async fn timeline_delete_handler( let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; check_permission(&request, Some(tenant_shard_id.tenant_id))?; - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); let state = get_state(&request); - state.tenant_manager.delete_timeline(tenant_shard_id, timeline_id, &ctx) - .instrument(info_span!("timeline_delete", tenant_id=%tenant_shard_id.tenant_id, shard=%tenant_shard_id.shard_slug(), %timeline_id)) + let tenant = state + .tenant_manager + .get_attached_tenant_shard(tenant_shard_id, false) + .map_err(|e| { + match e { + // GetTenantError has a built-in conversion to ApiError, but in this context we don't + // want to treat missing tenants as 404, to avoid ambiguity with successful deletions. + GetTenantError::NotFound(_) => ApiError::PreconditionFailed( + "Requested tenant is missing".to_string().into_boxed_str(), + ), + e => e.into(), + } + })?; + tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?; + tenant.delete_timeline(timeline_id).instrument(info_span!("timeline_delete", tenant_id=%tenant_shard_id.tenant_id, shard=%tenant_shard_id.shard_slug(), %timeline_id)) .await?; json_response(StatusCode::ACCEPTED, ()) @@ -1136,7 +1170,10 @@ async fn tenant_create_handler( // We created the tenant. Existing API semantics are that the tenant // is Active when this function returns. - if let res @ Err(_) = new_tenant.wait_to_become_active().await { + if let res @ Err(_) = new_tenant + .wait_to_become_active(ACTIVE_TENANT_TIMEOUT) + .await + { // This shouldn't happen because we just created the tenant directory // in tenant::mgr::create_tenant, and there aren't any remote timelines // to load, so, nothing can really fail during load. @@ -1560,9 +1597,7 @@ async fn disk_usage_eviction_run( } } - let config = json_request::(&mut r) - .await - .map_err(|_| ApiError::BadRequest(anyhow::anyhow!("invalid JSON body")))?; + let config = json_request::(&mut r).await?; let usage = Usage { config, diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index ba6fd00bd1..45c01b71d1 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -684,14 +684,54 @@ pub static STARTUP_IS_LOADING: Lazy = Lazy::new(|| { .expect("Failed to register pageserver_startup_is_loading") }); -/// How long did tenants take to go from construction to active state? -pub(crate) static TENANT_ACTIVATION: Lazy = Lazy::new(|| { - register_histogram!( +/// Metrics related to the lifecycle of a [`crate::tenant::Tenant`] object: things +/// like how long it took to load. +/// +/// Note that these are process-global metrics, _not_ per-tenant metrics. Per-tenant +/// metrics are rather expensive, and usually fine grained stuff makes more sense +/// at a timeline level than tenant level. +pub(crate) struct TenantMetrics { + /// How long did tenants take to go from construction to active state? + pub(crate) activation: Histogram, + pub(crate) preload: Histogram, + pub(crate) attach: Histogram, + + /// How many tenants are included in the initial startup of the pagesrever? + pub(crate) startup_scheduled: IntCounter, + pub(crate) startup_complete: IntCounter, +} + +pub(crate) static TENANT: Lazy = Lazy::new(|| { + TenantMetrics { + activation: register_histogram!( "pageserver_tenant_activation_seconds", "Time taken by tenants to activate, in seconds", CRITICAL_OP_BUCKETS.into() ) - .expect("Failed to register pageserver_tenant_activation_seconds metric") + .expect("Failed to register metric"), + preload: register_histogram!( + "pageserver_tenant_preload_seconds", + "Time taken by tenants to load remote metadata on startup/attach, in seconds", + CRITICAL_OP_BUCKETS.into() + ) + .expect("Failed to register metric"), + attach: register_histogram!( + "pageserver_tenant_attach_seconds", + "Time taken by tenants to intialize, after remote metadata is already loaded", + CRITICAL_OP_BUCKETS.into() + ) + .expect("Failed to register metric"), + startup_scheduled: register_int_counter!( + "pageserver_tenant_startup_scheduled", + "Number of tenants included in pageserver startup (doesn't count tenants attached later)" + ).expect("Failed to register metric"), + startup_complete: register_int_counter!( + "pageserver_tenant_startup_complete", + "Number of tenants that have completed warm-up, or activated on-demand during initial startup: \ + should eventually reach `pageserver_tenant_startup_scheduled_total`. Does not include broken \ + tenants: such cases will lead to this metric never reaching the scheduled count." + ).expect("Failed to register metric"), +} }); /// Each `Timeline`'s [`EVICTIONS_WITH_LOW_RESIDENCE_DURATION`] metric. @@ -2213,6 +2253,9 @@ pub fn preinitialize_metrics() { // Deletion queue stats Lazy::force(&DELETION_QUEUE); + // Tenant stats + Lazy::force(&TENANT); + // Tenant manager stats Lazy::force(&TENANT_MANAGER); diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index b80a498c82..cb1b2b8011 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -561,9 +561,14 @@ pub async fn shutdown_watcher() { /// cancelled. It can however be moved to other tasks, such as `tokio::task::spawn_blocking` or /// `tokio::task::JoinSet::spawn`. pub fn shutdown_token() -> CancellationToken { - SHUTDOWN_TOKEN - .try_with(|t| t.clone()) - .expect("shutdown_token() called in an unexpected task or thread") + let res = SHUTDOWN_TOKEN.try_with(|t| t.clone()); + + if cfg!(test) { + // in tests this method is called from non-taskmgr spawned tasks, and that is all ok. + res.unwrap_or_default() + } else { + res.expect("shutdown_token() called in an unexpected task or thread") + } } /// Has the current task been requested to shut down? diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 969210622c..eceef6bf78 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -36,6 +36,8 @@ use utils::crashsafe::path_with_suffix_extension; use utils::fs_ext; use utils::sync::gate::Gate; use utils::sync::gate::GateGuard; +use utils::timeout::timeout_cancellable; +use utils::timeout::TimeoutCancellableError; use self::config::AttachedLocationConfig; use self::config::AttachmentMode; @@ -59,7 +61,7 @@ use crate::deletion_queue::DeletionQueueClient; use crate::deletion_queue::DeletionQueueError; use crate::import_datadir; use crate::is_uninit_mark; -use crate::metrics::TENANT_ACTIVATION; +use crate::metrics::TENANT; use crate::metrics::{remove_tenant_metrics, TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC}; use crate::repository::GcResult; use crate::task_mgr; @@ -226,7 +228,7 @@ pub struct Tenant { /// The value creation timestamp, used to measure activation delay, see: /// - loading_started_at: Instant, + constructed_at: Instant, state: watch::Sender, @@ -276,6 +278,11 @@ pub struct Tenant { eviction_task_tenant_state: tokio::sync::Mutex, + /// If the tenant is in Activating state, notify this to encourage it + /// to proceed to Active as soon as possible, rather than waiting for lazy + /// background warmup. + pub(crate) activate_now_sem: tokio::sync::Semaphore, + pub(crate) delete_progress: Arc>, // Cancellation token fires when we have entered shutdown(). This is a parent of @@ -622,6 +629,14 @@ impl Tenant { "attach tenant", false, async move { + // Is this tenant being spawned as part of process startup? + let starting_up = init_order.is_some(); + scopeguard::defer! { + if starting_up { + TENANT.startup_complete.inc(); + } + } + // Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state. let make_broken = |t: &Tenant, err: anyhow::Error| { @@ -648,8 +663,62 @@ impl Tenant { .as_mut() .and_then(|x| x.initial_tenant_load_remote.take()); + enum AttachType<'a> { + // During pageserver startup, we are attaching this tenant lazily in the background + Warmup(tokio::sync::SemaphorePermit<'a>), + // During pageserver startup, we are attaching this tenant as soon as we can, + // because a client tried to access it. + OnDemand, + // During normal operations after startup, we are attaching a tenant. + Normal, + } + + // Before doing any I/O, wait for either or: + // - A client to attempt to access to this tenant (on-demand loading) + // - A permit to become available in the warmup semaphore (background warmup) + // + // Some-ness of init_order is how we know if we're attaching during startup or later + // in process lifetime. + let attach_type = if init_order.is_some() { + tokio::select!( + _ = tenant_clone.activate_now_sem.acquire() => { + tracing::info!("Activating tenant (on-demand)"); + AttachType::OnDemand + }, + permit_result = conf.concurrent_tenant_warmup.inner().acquire() => { + match permit_result { + Ok(p) => { + tracing::info!("Activating tenant (warmup)"); + AttachType::Warmup(p) + } + Err(_) => { + // This is unexpected: the warmup semaphore should stay alive + // for the lifetime of init_order. Log a warning and proceed. + tracing::warn!("warmup_limit semaphore unexpectedly closed"); + AttachType::Normal + } + } + + } + _ = tenant_clone.cancel.cancelled() => { + // This is safe, but should be pretty rare: it is interesting if a tenant + // stayed in Activating for such a long time that shutdown found it in + // that state. + tracing::info!(state=%tenant_clone.current_state(), "Tenant shut down before activation"); + return Ok(()); + }, + ) + } else { + AttachType::Normal + }; + + let preload_timer = TENANT.preload.start_timer(); let preload = match mode { - SpawnMode::Create => {None}, + SpawnMode::Create => { + // Don't count the skipped preload into the histogram of preload durations + preload_timer.stop_and_discard(); + None + }, SpawnMode::Normal => { match &remote_storage { Some(remote_storage) => Some( @@ -659,7 +728,11 @@ impl Tenant { tracing::info_span!(parent: None, "attach_preload", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()), ) .await { - Ok(p) => p, + Ok(p) => { + preload_timer.observe_duration(); + p + } + , Err(e) => { make_broken(&tenant_clone, anyhow::anyhow!(e)); return Ok(()); @@ -721,15 +794,43 @@ impl Tenant { } } + // We will time the duration of the attach phase unless this is a creation (attach will do no work) + let attach_timer = match mode { + SpawnMode::Create => None, + SpawnMode::Normal => {Some(TENANT.attach.start_timer())} + }; match tenant_clone.attach(preload, &ctx).await { Ok(()) => { info!("attach finished, activating"); + if let Some(t)= attach_timer {t.observe_duration();} tenant_clone.activate(broker_client, None, &ctx); } Err(e) => { + if let Some(t)= attach_timer {t.observe_duration();} make_broken(&tenant_clone, anyhow::anyhow!(e)); } } + + // If we are doing an opportunistic warmup attachment at startup, initialize + // logical size at the same time. This is better than starting a bunch of idle tenants + // with cold caches and then coming back later to initialize their logical sizes. + // + // It also prevents the warmup proccess competing with the concurrency limit on + // logical size calculations: if logical size calculation semaphore is saturated, + // then warmup will wait for that before proceeding to the next tenant. + if let AttachType::Warmup(_permit) = attach_type { + let mut futs = FuturesUnordered::new(); + let timelines: Vec<_> = tenant_clone.timelines.lock().unwrap().values().cloned().collect(); + for t in timelines { + futs.push(t.await_initial_logical_size()) + } + tracing::info!("Waiting for initial logical sizes while warming up..."); + while futs.next().await.is_some() { + + } + tracing::info!("Warm-up complete"); + } + Ok(()) } .instrument({ @@ -1696,6 +1797,15 @@ impl Tenant { Ok(loaded_timeline) } + pub(crate) async fn delete_timeline( + self: Arc, + timeline_id: TimelineId, + ) -> Result<(), DeleteTimelineError> { + DeleteTimelineFlow::run(&self, timeline_id, false).await?; + + Ok(()) + } + /// perform one garbage collection iteration, removing old data files from disk. /// this function is periodically called by gc task. /// also it can be explicitly requested through page server api 'do_gc' command. @@ -1857,7 +1967,7 @@ impl Tenant { ); *current_state = TenantState::Active; - let elapsed = self.loading_started_at.elapsed(); + let elapsed = self.constructed_at.elapsed(); let total_timelines = timelines_accessor.len(); // log a lot of stuff, because some tenants sometimes suffer from user-visible @@ -1872,7 +1982,7 @@ impl Tenant { "activation attempt finished" ); - TENANT_ACTIVATION.observe(elapsed.as_secs_f64()); + TENANT.activation.observe(elapsed.as_secs_f64()); }); } } @@ -2127,18 +2237,41 @@ impl Tenant { self.state.subscribe() } - pub(crate) async fn wait_to_become_active(&self) -> Result<(), GetActiveTenantError> { + /// The activate_now semaphore is initialized with zero units. As soon as + /// we add a unit, waiters will be able to acquire a unit and proceed. + pub(crate) fn activate_now(&self) { + self.activate_now_sem.add_permits(1); + } + + pub(crate) async fn wait_to_become_active( + &self, + timeout: Duration, + ) -> Result<(), GetActiveTenantError> { let mut receiver = self.state.subscribe(); loop { let current_state = receiver.borrow_and_update().clone(); match current_state { TenantState::Loading | TenantState::Attaching | TenantState::Activating(_) => { // in these states, there's a chance that we can reach ::Active - receiver.changed().await.map_err( - |_e: tokio::sync::watch::error::RecvError| - // Tenant existed but was dropped: report it as non-existent - GetActiveTenantError::NotFound(GetTenantError::NotFound(self.tenant_shard_id.tenant_id)) - )?; + self.activate_now(); + match timeout_cancellable(timeout, &self.cancel, receiver.changed()).await { + Ok(r) => { + r.map_err( + |_e: tokio::sync::watch::error::RecvError| + // Tenant existed but was dropped: report it as non-existent + GetActiveTenantError::NotFound(GetTenantError::NotFound(self.tenant_shard_id.tenant_id)) + )? + } + Err(TimeoutCancellableError::Cancelled) => { + return Err(GetActiveTenantError::Cancelled); + } + Err(TimeoutCancellableError::Timeout) => { + return Err(GetActiveTenantError::WaitForActiveTimeout { + latest_state: Some(self.current_state()), + wait_time: timeout, + }); + } + } } TenantState::Active { .. } => { return Ok(()); @@ -2463,7 +2596,7 @@ impl Tenant { conf, // using now here is good enough approximation to catch tenants with really long // activation times. - loading_started_at: Instant::now(), + constructed_at: Instant::now(), tenant_conf: Arc::new(RwLock::new(attached_conf)), timelines: Mutex::new(HashMap::new()), timelines_creating: Mutex::new(HashSet::new()), @@ -2475,6 +2608,7 @@ impl Tenant { cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()), cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)), eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()), + activate_now_sem: tokio::sync::Semaphore::new(0), delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())), cancel: CancellationToken::default(), gate: Gate::new(format!("Tenant<{tenant_shard_id}>")), @@ -3059,6 +3193,7 @@ impl Tenant { storage, &self.tenant_shard_id, &existing_initdb_timeline_id, + &self.cancel, ) .await .context("download initdb tar")?; @@ -3099,6 +3234,7 @@ impl Tenant { &timeline_id, pgdata_zstd.try_clone().await?, tar_zst_size, + &self.cancel, ) .await }, @@ -3106,8 +3242,7 @@ impl Tenant { 3, u32::MAX, "persist_initdb_tar_zst", - // TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066) - backoff::Cancel::new(CancellationToken::new(), || unreachable!()), + backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Cancelled")), ) .await?; diff --git a/pageserver/src/tenant/delete.rs b/pageserver/src/tenant/delete.rs index acd311ace6..e8491f26db 100644 --- a/pageserver/src/tenant/delete.rs +++ b/pageserver/src/tenant/delete.rs @@ -71,6 +71,7 @@ async fn create_remote_delete_mark( conf: &PageServerConf, remote_storage: &GenericRemoteStorage, tenant_shard_id: &TenantShardId, + cancel: &CancellationToken, ) -> Result<(), DeleteTenantError> { let remote_mark_path = remote_tenant_delete_mark_path(conf, tenant_shard_id)?; @@ -87,8 +88,7 @@ async fn create_remote_delete_mark( FAILED_UPLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, "mark_upload", - // TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066) - backoff::Cancel::new(CancellationToken::new(), || unreachable!()), + backoff::Cancel::new(cancel.clone(), || anyhow::anyhow!("Cancelled")), ) .await .context("mark_upload")?; @@ -170,6 +170,7 @@ async fn remove_tenant_remote_delete_mark( conf: &PageServerConf, remote_storage: Option<&GenericRemoteStorage>, tenant_shard_id: &TenantShardId, + cancel: &CancellationToken, ) -> Result<(), DeleteTenantError> { if let Some(remote_storage) = remote_storage { let path = remote_tenant_delete_mark_path(conf, tenant_shard_id)?; @@ -179,8 +180,7 @@ async fn remove_tenant_remote_delete_mark( FAILED_UPLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, "remove_tenant_remote_delete_mark", - // TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066) - backoff::Cancel::new(CancellationToken::new(), || unreachable!()), + backoff::Cancel::new(cancel.clone(), || anyhow::anyhow!("Cancelled")), ) .await .context("remove_tenant_remote_delete_mark")?; @@ -322,9 +322,15 @@ impl DeleteTenantFlow { // Though sounds scary, different mark name? // Detach currently uses remove_dir_all so in case of a crash we can end up in a weird state. if let Some(remote_storage) = &remote_storage { - create_remote_delete_mark(conf, remote_storage, &tenant.tenant_shard_id) - .await - .context("remote_mark")? + create_remote_delete_mark( + conf, + remote_storage, + &tenant.tenant_shard_id, + // Can't use tenant.cancel, it's already shut down. TODO: wire in an appropriate token + &CancellationToken::new(), + ) + .await + .context("remote_mark")? } fail::fail_point!("tenant-delete-before-create-local-mark", |_| { @@ -524,8 +530,14 @@ impl DeleteTenantFlow { .context("timelines dir not empty")?; } - remove_tenant_remote_delete_mark(conf, remote_storage.as_ref(), &tenant.tenant_shard_id) - .await?; + remove_tenant_remote_delete_mark( + conf, + remote_storage.as_ref(), + &tenant.tenant_shard_id, + // Can't use tenant.cancel, it's already shut down. TODO: wire in an appropriate token + &CancellationToken::new(), + ) + .await?; fail::fail_point!("tenant-delete-before-cleanup-remaining-fs-traces", |_| { Err(anyhow::anyhow!( diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index f53951e1d3..b2f14db9f7 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -28,7 +28,7 @@ use crate::control_plane_client::{ ControlPlaneClient, ControlPlaneGenerationsApi, RetryForeverError, }; use crate::deletion_queue::DeletionQueueClient; -use crate::metrics::TENANT_MANAGER as METRICS; +use crate::metrics::{TENANT, TENANT_MANAGER as METRICS}; use crate::task_mgr::{self, TaskKind}; use crate::tenant::config::{ AttachedLocationConfig, AttachmentMode, LocationConf, LocationMode, TenantConfOpt, @@ -44,7 +44,6 @@ use utils::generation::Generation; use utils::id::{TenantId, TimelineId}; use super::delete::DeleteTenantError; -use super::timeline::delete::DeleteTimelineFlow; use super::TenantSharedResources; /// For a tenant that appears in TenantsMap, it may either be @@ -430,6 +429,13 @@ pub async fn init_tenant_mgr( let tenant_generations = init_load_generations(conf, &tenant_configs, &resources, &cancel).await?; + tracing::info!( + "Attaching {} tenants at startup, warming up {} at a time", + tenant_configs.len(), + conf.concurrent_tenant_warmup.initial_permits() + ); + TENANT.startup_scheduled.inc_by(tenant_configs.len() as u64); + // Construct `Tenant` objects and start them running for (tenant_shard_id, location_conf) in tenant_configs { let tenant_dir_path = conf.tenant_path(&tenant_shard_id); @@ -848,17 +854,6 @@ impl TenantManager { } } - pub(crate) async fn delete_timeline( - &self, - tenant_shard_id: TenantShardId, - timeline_id: TimelineId, - _ctx: &RequestContext, - ) -> Result<(), DeleteTimelineError> { - let tenant = self.get_attached_tenant_shard(tenant_shard_id, true)?; - DeleteTimelineFlow::run(&tenant, timeline_id, false).await?; - Ok(()) - } - #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))] pub(crate) async fn upsert_location( &self, @@ -1221,7 +1216,10 @@ pub(crate) async fn get_active_tenant_with_timeout( // Fast path: we don't need to do any async waiting. return Ok(tenant.clone()); } - _ => (WaitFor::Tenant(tenant.clone()), tenant_shard_id), + _ => { + tenant.activate_now(); + (WaitFor::Tenant(tenant.clone()), tenant_shard_id) + } } } Some(TenantSlot::Secondary) => { @@ -1275,28 +1273,10 @@ pub(crate) async fn get_active_tenant_with_timeout( }; tracing::debug!("Waiting for tenant to enter active state..."); - match timeout_cancellable( - deadline.duration_since(Instant::now()), - cancel, - tenant.wait_to_become_active(), - ) - .await - { - Ok(Ok(())) => Ok(tenant), - Ok(Err(e)) => Err(e), - Err(TimeoutCancellableError::Timeout) => { - let latest_state = tenant.current_state(); - if latest_state == TenantState::Active { - Ok(tenant) - } else { - Err(GetActiveTenantError::WaitForActiveTimeout { - latest_state: Some(latest_state), - wait_time: timeout, - }) - } - } - Err(TimeoutCancellableError::Cancelled) => Err(GetActiveTenantError::Cancelled), - } + tenant + .wait_to_become_active(deadline.duration_since(Instant::now())) + .await?; + Ok(tenant) } pub(crate) async fn delete_tenant( diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 4b271a7395..52ee8f49ce 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -196,10 +196,12 @@ pub(crate) use upload::upload_initdb_dir; use utils::backoff::{ self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, }; +use utils::timeout::{timeout_cancellable, TimeoutCancellableError}; use std::collections::{HashMap, VecDeque}; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Mutex}; +use std::time::Duration; use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath}; use std::ops::DerefMut; @@ -316,6 +318,47 @@ pub struct RemoteTimelineClient { storage_impl: GenericRemoteStorage, deletion_queue_client: DeletionQueueClient, + + cancel: CancellationToken, +} + +/// This timeout is intended to deal with hangs in lower layers, e.g. stuck TCP flows. It is not +/// intended to be snappy enough for prompt shutdown, as we have a CancellationToken for that. +const UPLOAD_TIMEOUT: Duration = Duration::from_secs(120); +const DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(120); + +/// Wrapper for timeout_cancellable that flattens result and converts TimeoutCancellableError to anyhow. +/// +/// This is a convenience for the various upload functions. In future +/// the anyhow::Error result should be replaced with a more structured type that +/// enables callers to avoid handling shutdown as an error. +async fn upload_cancellable(cancel: &CancellationToken, future: F) -> anyhow::Result<()> +where + F: std::future::Future>, +{ + match timeout_cancellable(UPLOAD_TIMEOUT, cancel, future).await { + Ok(Ok(())) => Ok(()), + Ok(Err(e)) => Err(e), + Err(TimeoutCancellableError::Timeout) => Err(anyhow::anyhow!("Timeout")), + Err(TimeoutCancellableError::Cancelled) => Err(anyhow::anyhow!("Shutting down")), + } +} +/// Wrapper for timeout_cancellable that flattens result and converts TimeoutCancellableError to DownloaDError. +async fn download_cancellable( + cancel: &CancellationToken, + future: F, +) -> Result +where + F: std::future::Future>, +{ + match timeout_cancellable(DOWNLOAD_TIMEOUT, cancel, future).await { + Ok(Ok(r)) => Ok(r), + Ok(Err(e)) => Err(e), + Err(TimeoutCancellableError::Timeout) => { + Err(DownloadError::Other(anyhow::anyhow!("Timed out"))) + } + Err(TimeoutCancellableError::Cancelled) => Err(DownloadError::Cancelled), + } } impl RemoteTimelineClient { @@ -351,6 +394,7 @@ impl RemoteTimelineClient { &tenant_shard_id, &timeline_id, )), + cancel: CancellationToken::new(), } } @@ -501,6 +545,7 @@ impl RemoteTimelineClient { &self, layer_file_name: &LayerFileName, layer_metadata: &LayerFileMetadata, + cancel: &CancellationToken, ) -> anyhow::Result { let downloaded_size = { let _unfinished_gauge_guard = self.metrics.call_begin( @@ -517,6 +562,7 @@ impl RemoteTimelineClient { self.timeline_id, layer_file_name, layer_metadata, + cancel, ) .measure_remote_op( self.tenant_shard_id.tenant_id, @@ -971,6 +1017,7 @@ impl RemoteTimelineClient { &self.timeline_id, self.generation, &index_part_with_deleted_at, + &self.cancel, ) }, |_e| false, @@ -980,8 +1027,7 @@ impl RemoteTimelineClient { // when executed as part of tenant deletion this happens in the background 2, "persist_index_part_with_deleted_flag", - // TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066) - backoff::Cancel::new(CancellationToken::new(), || unreachable!()), + backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Cancelled")), ) .await?; @@ -1281,6 +1327,7 @@ impl RemoteTimelineClient { path, layer_metadata, self.generation, + &self.cancel, ) .measure_remote_op( self.tenant_shard_id.tenant_id, @@ -1307,6 +1354,7 @@ impl RemoteTimelineClient { &self.timeline_id, self.generation, index_part, + &self.cancel, ) .measure_remote_op( self.tenant_shard_id.tenant_id, @@ -1828,6 +1876,7 @@ mod tests { &self.harness.tenant_shard_id, &TIMELINE_ID, )), + cancel: CancellationToken::new(), }) } diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index ed32c4eed9..d3956163c8 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -5,7 +5,6 @@ use std::collections::HashSet; use std::future::Future; -use std::time::Duration; use anyhow::{anyhow, Context}; use camino::{Utf8Path, Utf8PathBuf}; @@ -14,13 +13,17 @@ use tokio::fs::{self, File, OpenOptions}; use tokio::io::{AsyncSeekExt, AsyncWriteExt}; use tokio_util::sync::CancellationToken; use tracing::warn; +use utils::timeout::timeout_cancellable; use utils::{backoff, crashsafe}; use crate::config::PageServerConf; -use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path}; +use crate::tenant::remote_timeline_client::{ + download_cancellable, remote_layer_path, remote_timelines_path, DOWNLOAD_TIMEOUT, +}; use crate::tenant::storage_layer::LayerFileName; use crate::tenant::timeline::span::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::Generation; +use crate::virtual_file::on_fatal_io_error; use crate::TEMP_FILE_SUFFIX; use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode}; use utils::crashsafe::path_with_suffix_extension; @@ -32,8 +35,6 @@ use super::{ FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, INITDB_PATH, }; -static MAX_DOWNLOAD_DURATION: Duration = Duration::from_secs(120); - /// /// If 'metadata' is given, we will validate that the downloaded file's size matches that /// in the metadata. (In the future, we might do more cross-checks, like CRC validation) @@ -46,6 +47,7 @@ pub async fn download_layer_file<'a>( timeline_id: TimelineId, layer_file_name: &'a LayerFileName, layer_metadata: &'a LayerFileMetadata, + cancel: &CancellationToken, ) -> Result { debug_assert_current_span_has_tenant_and_timeline_id(); @@ -73,14 +75,18 @@ pub async fn download_layer_file<'a>( // If pageserver crashes the temp file will be deleted on startup and re-downloaded. let temp_file_path = path_with_suffix_extension(&local_path, TEMP_DOWNLOAD_EXTENSION); + let cancel_inner = cancel.clone(); let (mut destination_file, bytes_amount) = download_retry( || async { let destination_file = tokio::fs::File::create(&temp_file_path) .await .with_context(|| format!("create a destination file for layer '{temp_file_path}'")) .map_err(DownloadError::Other)?; - let download = storage - .download(&remote_path) + + // Cancellation safety: it is safe to cancel this future, because it isn't writing to a local + // file: the write to local file doesn't start until after the request header is returned + // and we start draining the body stream below + let download = download_cancellable(&cancel_inner, storage.download(&remote_path)) .await .with_context(|| { format!( @@ -94,12 +100,33 @@ pub async fn download_layer_file<'a>( let mut reader = tokio_util::io::StreamReader::new(download.download_stream); - let bytes_amount = tokio::time::timeout( - MAX_DOWNLOAD_DURATION, + // Cancellation safety: it is safe to cancel this future because it is writing into a temporary file, + // and we will unlink the temporary file if there is an error. This unlink is important because we + // are in a retry loop, and we wouldn't want to leave behind a rogue write I/O to a file that + // we will imminiently try and write to again. + let bytes_amount: u64 = match timeout_cancellable( + DOWNLOAD_TIMEOUT, + &cancel_inner, tokio::io::copy_buf(&mut reader, &mut destination_file), ) .await - .map_err(|e| DownloadError::Other(anyhow::anyhow!("Timed out {:?}", e)))? + .with_context(|| { + format!( + "download layer at remote path '{remote_path:?}' into file {temp_file_path:?}" + ) + }) + .map_err(DownloadError::Other)? + { + Ok(b) => Ok(b), + Err(e) => { + // Remove incomplete files: on restart Timeline would do this anyway, but we must + // do it here for the retry case. + if let Err(e) = tokio::fs::remove_file(&temp_file_path).await { + on_fatal_io_error(&e, &format!("Removing temporary file {temp_file_path}")); + } + Err(e) + } + } .with_context(|| { format!( "download layer at remote path '{remote_path:?}' into file {temp_file_path:?}" @@ -112,6 +139,7 @@ pub async fn download_layer_file<'a>( Ok((destination_file, bytes_amount)) }, &format!("download {remote_path:?}"), + cancel, ) .await?; @@ -188,8 +216,14 @@ pub async fn list_remote_timelines( anyhow::bail!("storage-sync-list-remote-timelines"); }); + let cancel_inner = cancel.clone(); let listing = download_retry_forever( - || storage.list(Some(&remote_path), ListingMode::WithDelimiter), + || { + download_cancellable( + &cancel_inner, + storage.list(Some(&remote_path), ListingMode::WithDelimiter), + ) + }, &format!("list timelines for {tenant_shard_id}"), cancel, ) @@ -230,9 +264,13 @@ async fn do_download_index_part( let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation); + let cancel_inner = cancel.clone(); let index_part_bytes = download_retry_forever( || async { - let index_part_download = storage.download(&remote_path).await?; + // Cancellation: if is safe to cancel this future because we're just downloading into + // a memory buffer, not touching local disk. + let index_part_download = + download_cancellable(&cancel_inner, storage.download(&remote_path)).await?; let mut index_part_bytes = Vec::new(); let mut stream = std::pin::pin!(index_part_download.download_stream); @@ -347,10 +385,7 @@ pub(super) async fn download_index_part( FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, "listing index_part files", - // TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066) - backoff::Cancel::new(CancellationToken::new(), || -> anyhow::Error { - unreachable!() - }), + backoff::Cancel::new(cancel.clone(), || anyhow::anyhow!("Cancelled")), ) .await .map_err(DownloadError::Other)?; @@ -389,6 +424,7 @@ pub(crate) async fn download_initdb_tar_zst( storage: &GenericRemoteStorage, tenant_shard_id: &TenantShardId, timeline_id: &TimelineId, + cancel: &CancellationToken, ) -> Result<(Utf8PathBuf, File), DownloadError> { debug_assert_current_span_has_tenant_and_timeline_id(); @@ -406,6 +442,8 @@ pub(crate) async fn download_initdb_tar_zst( "{INITDB_PATH}.download-{timeline_id}.{TEMP_FILE_SUFFIX}" )); + let cancel_inner = cancel.clone(); + let file = download_retry( || async { let file = OpenOptions::new() @@ -418,10 +456,14 @@ pub(crate) async fn download_initdb_tar_zst( .with_context(|| format!("tempfile creation {temp_path}")) .map_err(DownloadError::Other)?; - let download = storage.download(&remote_path).await?; + let download = + download_cancellable(&cancel_inner, storage.download(&remote_path)).await?; let mut download = tokio_util::io::StreamReader::new(download.download_stream); let mut writer = tokio::io::BufWriter::with_capacity(8 * 1024, file); + // TODO: this consumption of the response body should be subject to timeout + cancellation, but + // not without thinking carefully about how to recover safely from cancelling a write to + // local storage (e.g. by writing into a temp file as we do in download_layer) tokio::io::copy_buf(&mut download, &mut writer) .await .with_context(|| format!("download initdb.tar.zst at {remote_path:?}")) @@ -437,6 +479,7 @@ pub(crate) async fn download_initdb_tar_zst( Ok(file) }, &format!("download {remote_path}"), + cancel, ) .await .map_err(|e| { @@ -460,7 +503,11 @@ pub(crate) async fn download_initdb_tar_zst( /// with backoff. /// /// (See similar logic for uploads in `perform_upload_task`) -async fn download_retry(op: O, description: &str) -> Result +async fn download_retry( + op: O, + description: &str, + cancel: &CancellationToken, +) -> Result where O: FnMut() -> F, F: Future>, @@ -471,10 +518,7 @@ where FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, description, - // TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066) - backoff::Cancel::new(CancellationToken::new(), || -> DownloadError { - unreachable!() - }), + backoff::Cancel::new(cancel.clone(), || DownloadError::Cancelled), ) .await } diff --git a/pageserver/src/tenant/remote_timeline_client/upload.rs b/pageserver/src/tenant/remote_timeline_client/upload.rs index d0744e7c83..11c6956875 100644 --- a/pageserver/src/tenant/remote_timeline_client/upload.rs +++ b/pageserver/src/tenant/remote_timeline_client/upload.rs @@ -4,14 +4,17 @@ use anyhow::{bail, Context}; use camino::Utf8Path; use fail::fail_point; use pageserver_api::shard::TenantShardId; -use std::io::ErrorKind; +use std::io::{ErrorKind, SeekFrom}; use tokio::fs::{self, File}; +use tokio::io::AsyncSeekExt; +use tokio_util::sync::CancellationToken; use super::Generation; use crate::{ config::PageServerConf, tenant::remote_timeline_client::{ index::IndexPart, remote_index_path, remote_initdb_archive_path, remote_path, + upload_cancellable, }, }; use remote_storage::GenericRemoteStorage; @@ -28,6 +31,7 @@ pub(super) async fn upload_index_part<'a>( timeline_id: &TimelineId, generation: Generation, index_part: &'a IndexPart, + cancel: &CancellationToken, ) -> anyhow::Result<()> { tracing::trace!("uploading new index part"); @@ -43,14 +47,16 @@ pub(super) async fn upload_index_part<'a>( let index_part_bytes = bytes::Bytes::from(index_part_bytes); let remote_path = remote_index_path(tenant_shard_id, timeline_id, generation); - storage - .upload_storage_object( + upload_cancellable( + cancel, + storage.upload_storage_object( futures::stream::once(futures::future::ready(Ok(index_part_bytes))), index_part_size, &remote_path, - ) - .await - .with_context(|| format!("upload index part for '{tenant_shard_id} / {timeline_id}'")) + ), + ) + .await + .with_context(|| format!("upload index part for '{tenant_shard_id} / {timeline_id}'")) } /// Attempts to upload given layer files. @@ -63,6 +69,7 @@ pub(super) async fn upload_timeline_layer<'a>( source_path: &'a Utf8Path, known_metadata: &'a LayerFileMetadata, generation: Generation, + cancel: &CancellationToken, ) -> anyhow::Result<()> { fail_point!("before-upload-layer", |_| { bail!("failpoint before-upload-layer") @@ -106,8 +113,7 @@ pub(super) async fn upload_timeline_layer<'a>( let reader = tokio_util::io::ReaderStream::with_capacity(source_file, super::BUFFER_SIZE); - storage - .upload(reader, fs_size, &storage_path, None) + upload_cancellable(cancel, storage.upload(reader, fs_size, &storage_path, None)) .await .with_context(|| format!("upload layer from local path '{source_path}'"))?; @@ -119,16 +125,22 @@ pub(crate) async fn upload_initdb_dir( storage: &GenericRemoteStorage, tenant_id: &TenantId, timeline_id: &TimelineId, - initdb_tar_zst: File, + mut initdb_tar_zst: File, size: u64, + cancel: &CancellationToken, ) -> anyhow::Result<()> { tracing::trace!("uploading initdb dir"); + // We might have read somewhat into the file already in the prior retry attempt + initdb_tar_zst.seek(SeekFrom::Start(0)).await?; + let file = tokio_util::io::ReaderStream::with_capacity(initdb_tar_zst, super::BUFFER_SIZE); let remote_path = remote_initdb_archive_path(tenant_id, timeline_id); - storage - .upload_storage_object(file, size as usize, &remote_path) - .await - .with_context(|| format!("upload initdb dir for '{tenant_id} / {timeline_id}'")) + upload_cancellable( + cancel, + storage.upload_storage_object(file, size as usize, &remote_path), + ) + .await + .with_context(|| format!("upload initdb dir for '{tenant_id} / {timeline_id}'")) } diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 69a2893456..9a8ddc1a6b 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -259,8 +259,9 @@ impl Layer { layer .get_value_reconstruct_data(key, lsn_range, reconstruct_data, &self.0, ctx) - .instrument(tracing::info_span!("get_value_reconstruct_data", layer=%self)) + .instrument(tracing::debug_span!("get_value_reconstruct_data", layer=%self)) .await + .with_context(|| format!("get_value_reconstruct_data for layer {self}")) } /// Download the layer if evicted. @@ -654,7 +655,6 @@ impl LayerInner { } /// Cancellation safe. - #[tracing::instrument(skip_all, fields(layer=%self))] async fn get_or_maybe_download( self: &Arc, allow_download: bool, @@ -663,95 +663,101 @@ impl LayerInner { let mut init_permit = None; loop { - let download = move |permit| async move { - // disable any scheduled but not yet running eviction deletions for this - let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed); + let download = move |permit| { + async move { + // disable any scheduled but not yet running eviction deletions for this + let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed); - // count cancellations, which currently remain largely unexpected - let init_cancelled = - scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled()); + // count cancellations, which currently remain largely unexpected + let init_cancelled = + scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled()); - // no need to make the evict_and_wait wait for the actual download to complete - drop(self.status.send(Status::Downloaded)); + // no need to make the evict_and_wait wait for the actual download to complete + drop(self.status.send(Status::Downloaded)); - let timeline = self - .timeline - .upgrade() - .ok_or_else(|| DownloadError::TimelineShutdown)?; + let timeline = self + .timeline + .upgrade() + .ok_or_else(|| DownloadError::TimelineShutdown)?; - // FIXME: grab a gate + // FIXME: grab a gate - let can_ever_evict = timeline.remote_client.as_ref().is_some(); + let can_ever_evict = timeline.remote_client.as_ref().is_some(); - // check if we really need to be downloaded; could have been already downloaded by a - // cancelled previous attempt. - let needs_download = self - .needs_download() - .await - .map_err(DownloadError::PreStatFailed)?; + // check if we really need to be downloaded; could have been already downloaded by a + // cancelled previous attempt. + let needs_download = self + .needs_download() + .await + .map_err(DownloadError::PreStatFailed)?; - let permit = if let Some(reason) = needs_download { - if let NeedsDownload::NotFile(ft) = reason { - return Err(DownloadError::NotFile(ft)); + let permit = if let Some(reason) = needs_download { + if let NeedsDownload::NotFile(ft) = reason { + return Err(DownloadError::NotFile(ft)); + } + + // only reset this after we've decided we really need to download. otherwise it'd + // be impossible to mark cancelled downloads for eviction, like one could imagine + // we would like to do for prefetching which was not needed. + self.wanted_evicted.store(false, Ordering::Release); + + if !can_ever_evict { + return Err(DownloadError::NoRemoteStorage); + } + + if let Some(ctx) = ctx { + self.check_expected_download(ctx)?; + } + + if !allow_download { + // this does look weird, but for LayerInner the "downloading" means also changing + // internal once related state ... + return Err(DownloadError::DownloadRequired); + } + + tracing::info!(%reason, "downloading on-demand"); + + self.spawn_download_and_wait(timeline, permit).await? + } else { + // the file is present locally, probably by a previous but cancelled call to + // get_or_maybe_download. alternatively we might be running without remote storage. + LAYER_IMPL_METRICS.inc_init_needed_no_download(); + + permit + }; + + let since_last_eviction = + self.last_evicted_at.lock().unwrap().map(|ts| ts.elapsed()); + if let Some(since_last_eviction) = since_last_eviction { + // FIXME: this will not always be recorded correctly until #6028 (the no + // download needed branch above) + LAYER_IMPL_METRICS.record_redownloaded_after(since_last_eviction); } - // only reset this after we've decided we really need to download. otherwise it'd - // be impossible to mark cancelled downloads for eviction, like one could imagine - // we would like to do for prefetching which was not needed. - self.wanted_evicted.store(false, Ordering::Release); + let res = Arc::new(DownloadedLayer { + owner: Arc::downgrade(self), + kind: tokio::sync::OnceCell::default(), + version: next_version, + }); - if !can_ever_evict { - return Err(DownloadError::NoRemoteStorage); + self.access_stats.record_residence_event( + LayerResidenceStatus::Resident, + LayerResidenceEventReason::ResidenceChange, + ); + + let waiters = self.inner.initializer_count(); + if waiters > 0 { + tracing::info!( + waiters, + "completing the on-demand download for other tasks" + ); } - if let Some(ctx) = ctx { - self.check_expected_download(ctx)?; - } + scopeguard::ScopeGuard::into_inner(init_cancelled); - if !allow_download { - // this does look weird, but for LayerInner the "downloading" means also changing - // internal once related state ... - return Err(DownloadError::DownloadRequired); - } - - tracing::info!(%reason, "downloading on-demand"); - - self.spawn_download_and_wait(timeline, permit).await? - } else { - // the file is present locally, probably by a previous but cancelled call to - // get_or_maybe_download. alternatively we might be running without remote storage. - LAYER_IMPL_METRICS.inc_init_needed_no_download(); - - permit - }; - - let since_last_eviction = - self.last_evicted_at.lock().unwrap().map(|ts| ts.elapsed()); - if let Some(since_last_eviction) = since_last_eviction { - // FIXME: this will not always be recorded correctly until #6028 (the no - // download needed branch above) - LAYER_IMPL_METRICS.record_redownloaded_after(since_last_eviction); + Ok((ResidentOrWantedEvicted::Resident(res), permit)) } - - let res = Arc::new(DownloadedLayer { - owner: Arc::downgrade(self), - kind: tokio::sync::OnceCell::default(), - version: next_version, - }); - - self.access_stats.record_residence_event( - LayerResidenceStatus::Resident, - LayerResidenceEventReason::ResidenceChange, - ); - - let waiters = self.inner.initializer_count(); - if waiters > 0 { - tracing::info!(waiters, "completing the on-demand download for other tasks"); - } - - scopeguard::ScopeGuard::into_inner(init_cancelled); - - Ok((ResidentOrWantedEvicted::Resident(res), permit)) + .instrument(tracing::info_span!("get_or_maybe_download", layer=%self)) }; if let Some(init_permit) = init_permit.take() { @@ -862,6 +868,7 @@ impl LayerInner { let result = client.download_layer_file( &this.desc.filename(), &this.metadata(), + &crate::task_mgr::shutdown_token() ) .await; diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 4b118442f4..7ff1873eda 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -54,29 +54,18 @@ impl BackgroundLoopKind { } } -pub(crate) enum RateLimitError { - Cancelled, -} - -pub(crate) async fn concurrent_background_tasks_rate_limit( +/// Cancellation safe. +pub(crate) async fn concurrent_background_tasks_rate_limit_permit( loop_kind: BackgroundLoopKind, _ctx: &RequestContext, - cancel: &CancellationToken, -) -> Result { +) -> impl Drop { let _guard = crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_GAUGE .with_label_values(&[loop_kind.as_static_str()]) .guard(); - tokio::select! { - permit = CONCURRENT_BACKGROUND_TASKS.acquire() => { - match permit { - Ok(permit) => Ok(permit), - Err(_closed) => unreachable!("we never close the semaphore"), - } - }, - _ = cancel.cancelled() => { - Err(RateLimitError::Cancelled) - } + match CONCURRENT_BACKGROUND_TASKS.acquire().await { + Ok(permit) => permit, + Err(_closed) => unreachable!("we never close the semaphore"), } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index ac1922ccad..1e84fa1848 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -51,7 +51,7 @@ use crate::tenant::storage_layer::{ LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult, ValueReconstructState, }; -use crate::tenant::tasks::{BackgroundLoopKind, RateLimitError}; +use crate::tenant::tasks::BackgroundLoopKind; use crate::tenant::timeline::logical_size::CurrentLogicalSize; use crate::tenant::{ layer_map::{LayerMap, SearchResult}, @@ -715,19 +715,27 @@ impl Timeline { flags: EnumSet, ctx: &RequestContext, ) -> Result<(), CompactionError> { - let _g = self.compaction_lock.lock().await; + // most likely the cancellation token is from background task, but in tests it could be the + // request task as well. + + let prepare = async move { + let guard = self.compaction_lock.lock().await; + + let permit = super::tasks::concurrent_background_tasks_rate_limit_permit( + BackgroundLoopKind::Compaction, + ctx, + ) + .await; + + (guard, permit) + }; // this wait probably never needs any "long time spent" logging, because we already nag if // compaction task goes over it's period (20s) which is quite often in production. - let _permit = match super::tasks::concurrent_background_tasks_rate_limit( - BackgroundLoopKind::Compaction, - ctx, - cancel, - ) - .await - { - Ok(permit) => permit, - Err(RateLimitError::Cancelled) => return Ok(()), + let (_guard, _permit) = tokio::select! { + tuple = prepare => { tuple }, + _ = self.cancel.cancelled() => return Ok(()), + _ = cancel.cancelled() => return Ok(()), }; let last_record_lsn = self.get_last_record_lsn(); @@ -1726,6 +1734,7 @@ impl Timeline { self.current_logical_size.current_size().accuracy(), logical_size::Accuracy::Exact, ); + self.current_logical_size.initialized.add_permits(1); return; }; @@ -1771,6 +1780,11 @@ impl Timeline { cancel: CancellationToken, background_ctx: RequestContext, ) { + scopeguard::defer! { + // Irrespective of the outcome of this operation, we should unblock anyone waiting for it. + self.current_logical_size.initialized.add_permits(1); + } + enum BackgroundCalculationError { Cancelled, Other(anyhow::Error), @@ -1782,22 +1796,22 @@ impl Timeline { let skip_concurrency_limiter = &skip_concurrency_limiter; async move { let cancel = task_mgr::shutdown_token(); - let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit( + let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit_permit( BackgroundLoopKind::InitialLogicalSizeCalculation, background_ctx, - &cancel, ); use crate::metrics::initial_logical_size::StartCircumstances; let (_maybe_permit, circumstances) = tokio::select! { - res = wait_for_permit => { - match res { - Ok(permit) => (Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit), - Err(RateLimitError::Cancelled) => { - return Err(BackgroundCalculationError::Cancelled); - } - } + permit = wait_for_permit => { + (Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit) } + _ = self_ref.cancel.cancelled() => { + return Err(BackgroundCalculationError::Cancelled); + } + _ = cancel.cancelled() => { + return Err(BackgroundCalculationError::Cancelled); + }, () = skip_concurrency_limiter.cancelled() => { // Some action that is part of a end user interaction requested logical size // => break out of the rate limit @@ -3096,6 +3110,32 @@ impl Timeline { Ok(image_layers) } + + /// Wait until the background initial logical size calculation is complete, or + /// this Timeline is shut down. Calling this function will cause the initial + /// logical size calculation to skip waiting for the background jobs barrier. + pub(crate) async fn await_initial_logical_size(self: Arc) { + if let Some(await_bg_cancel) = self + .current_logical_size + .cancel_wait_for_background_loop_concurrency_limit_semaphore + .get() + { + await_bg_cancel.cancel(); + } else { + // We should not wait if we were not able to explicitly instruct + // the logical size cancellation to skip the concurrency limit semaphore. + // TODO: this is an unexpected case. We should restructure so that it + // can't happen. + tracing::info!( + "await_initial_logical_size: can't get semaphore cancel token, skipping" + ); + } + + tokio::select!( + _ = self.current_logical_size.initialized.acquire() => {}, + _ = self.cancel.cancelled() => {} + ) + } } #[derive(Default)] @@ -3852,7 +3892,14 @@ impl Timeline { /// within a layer file. We can only remove the whole file if it's fully /// obsolete. pub(super) async fn gc(&self) -> anyhow::Result { - let _g = self.gc_lock.lock().await; + // this is most likely the background tasks, but it might be the spawned task from + // immediate_gc + let cancel = crate::task_mgr::shutdown_token(); + let _g = tokio::select! { + guard = self.gc_lock.lock() => guard, + _ = self.cancel.cancelled() => return Ok(GcResult::default()), + _ = cancel.cancelled() => return Ok(GcResult::default()), + }; let timer = self.metrics.garbage_collect_histo.start_timer(); fail_point!("before-timeline-gc"); diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 782e8f9e39..ea5f5f5fa7 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -30,7 +30,7 @@ use crate::{ task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}, tenant::{ config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold}, - tasks::{BackgroundLoopKind, RateLimitError}, + tasks::BackgroundLoopKind, timeline::EvictionError, LogicalSizeCalculationCause, Tenant, }, @@ -158,15 +158,15 @@ impl Timeline { ) -> ControlFlow<()> { let now = SystemTime::now(); - let _permit = match crate::tenant::tasks::concurrent_background_tasks_rate_limit( + let acquire_permit = crate::tenant::tasks::concurrent_background_tasks_rate_limit_permit( BackgroundLoopKind::Eviction, ctx, - cancel, - ) - .await - { - Ok(permit) => permit, - Err(RateLimitError::Cancelled) => return ControlFlow::Break(()), + ); + + let _permit = tokio::select! { + permit = acquire_permit => permit, + _ = cancel.cancelled() => return ControlFlow::Break(()), + _ = self.cancel.cancelled() => return ControlFlow::Break(()), }; // If we evict layers but keep cached values derived from those layers, then diff --git a/pageserver/src/tenant/timeline/logical_size.rs b/pageserver/src/tenant/timeline/logical_size.rs index f2db8c91fc..03bc59ea38 100644 --- a/pageserver/src/tenant/timeline/logical_size.rs +++ b/pageserver/src/tenant/timeline/logical_size.rs @@ -34,6 +34,9 @@ pub(super) struct LogicalSize { pub(crate) cancel_wait_for_background_loop_concurrency_limit_semaphore: OnceCell, + /// Once the initial logical size is initialized, this is notified. + pub(crate) initialized: tokio::sync::Semaphore, + /// Latest Lsn that has its size uncalculated, could be absent for freshly created timelines. pub initial_part_end: Option, @@ -125,6 +128,7 @@ impl LogicalSize { initial_part_end: None, size_added_after_initial: AtomicI64::new(0), did_return_approximate_to_walreceiver: AtomicBool::new(false), + initialized: tokio::sync::Semaphore::new(0), } } @@ -135,6 +139,7 @@ impl LogicalSize { initial_part_end: Some(compute_to), size_added_after_initial: AtomicI64::new(0), did_return_approximate_to_walreceiver: AtomicBool::new(false), + initialized: tokio::sync::Semaphore::new(0), } } diff --git a/pgxn/neon/neon_utils.c b/pgxn/neon/neon_utils.c index 06faea7490..807d2decf6 100644 --- a/pgxn/neon/neon_utils.c +++ b/pgxn/neon/neon_utils.c @@ -1,3 +1,6 @@ + +#include + #include "postgres.h" #include "access/timeline.h" @@ -114,3 +117,25 @@ pq_sendint64_le(StringInfo buf, uint64 i) memcpy(buf->data + buf->len, &i, sizeof(uint64)); buf->len += sizeof(uint64); } + +/* + * Disables core dump for the current process. + */ +void +disable_core_dump() +{ + struct rlimit rlim; + +#ifdef WALPROPOSER_LIB /* skip in simulation mode */ + return; +#endif + + rlim.rlim_cur = 0; + rlim.rlim_max = 0; + if (setrlimit(RLIMIT_CORE, &rlim)) + { + int save_errno = errno; + + fprintf(stderr, "WARNING: disable cores setrlimit failed: %s", strerror(save_errno)); + } +} diff --git a/pgxn/neon/neon_utils.h b/pgxn/neon/neon_utils.h index e3fafc8d0f..20745d8b26 100644 --- a/pgxn/neon/neon_utils.h +++ b/pgxn/neon/neon_utils.h @@ -8,5 +8,6 @@ uint32 pq_getmsgint32_le(StringInfo msg); uint64 pq_getmsgint64_le(StringInfo msg); void pq_sendint32_le(StringInfo buf, uint32 i); void pq_sendint64_le(StringInfo buf, uint64 i); +extern void disable_core_dump(); #endif /* __NEON_UTILS_H__ */ diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 7d9dbfdb7f..fc3332612c 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -35,6 +35,8 @@ * *------------------------------------------------------------------------- */ +#include + #include "postgres.h" #include "libpq/pqformat.h" #include "neon.h" @@ -1069,6 +1071,12 @@ DetermineEpochStartLsn(WalProposer *wp) if (!((dth->n_entries >= 1) && (dth->entries[dth->n_entries - 1].term == walprop_shared->mineLastElectedTerm))) { + /* + * Panic to restart PG as we need to retake basebackup. + * However, don't dump core as this is kinda expected + * scenario. + */ + disable_core_dump(); walprop_log(PANIC, "collected propEpochStartLsn %X/%X, but basebackup LSN %X/%X", LSN_FORMAT_ARGS(wp->propEpochStartLsn), @@ -1445,7 +1453,12 @@ RecvAppendResponses(Safekeeper *sk) if (sk->appendResponse.term > wp->propTerm) { - /* Another compute with higher term is running. */ + /* + * Another compute with higher term is running. Panic to restart + * PG as we likely need to retake basebackup. However, don't dump + * core as this is kinda expected scenario. + */ + disable_core_dump(); walprop_log(PANIC, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT "", sk->host, sk->port, sk->appendResponse.term, wp->propTerm); diff --git a/proxy/src/auth/backend.rs b/proxy/src/auth/backend.rs index ba054b53eb..3b09e05bd2 100644 --- a/proxy/src/auth/backend.rs +++ b/proxy/src/auth/backend.rs @@ -11,7 +11,8 @@ use crate::auth::validate_password_and_exchange; use crate::console::errors::GetAuthInfoError; use crate::console::provider::AuthInfo; use crate::console::AuthSecret; -use crate::proxy::{handle_try_wake, retry_after, LatencyTimer}; +use crate::proxy::connect_compute::handle_try_wake; +use crate::proxy::retry::retry_after; use crate::scram; use crate::stream::Stream; use crate::{ @@ -22,6 +23,7 @@ use crate::{ provider::{CachedNodeInfo, ConsoleReqExtra}, Api, }, + metrics::LatencyTimer, stream, url, }; use futures::TryFutureExt; diff --git a/proxy/src/auth/backend/classic.rs b/proxy/src/auth/backend/classic.rs index ce52daf16c..5c394ec649 100644 --- a/proxy/src/auth/backend/classic.rs +++ b/proxy/src/auth/backend/classic.rs @@ -4,7 +4,7 @@ use crate::{ compute, config::AuthenticationConfig, console::AuthSecret, - proxy::LatencyTimer, + metrics::LatencyTimer, sasl, stream::{PqStream, Stream}, }; diff --git a/proxy/src/auth/backend/hacks.rs b/proxy/src/auth/backend/hacks.rs index abbd25008b..5dde514bca 100644 --- a/proxy/src/auth/backend/hacks.rs +++ b/proxy/src/auth/backend/hacks.rs @@ -4,7 +4,7 @@ use super::{ use crate::{ auth::{self, AuthFlow}, console::AuthSecret, - proxy::LatencyTimer, + metrics::LatencyTimer, sasl, stream::{self, Stream}, }; diff --git a/proxy/src/auth/credentials.rs b/proxy/src/auth/credentials.rs index 72149e8e29..c04769a199 100644 --- a/proxy/src/auth/credentials.rs +++ b/proxy/src/auth/credentials.rs @@ -1,9 +1,8 @@ //! User credentials used in authentication. use crate::{ - auth::password_hack::parse_endpoint_param, - error::UserFacingError, - proxy::{neon_options_str, NUM_CONNECTION_ACCEPTED_BY_SNI}, + auth::password_hack::parse_endpoint_param, error::UserFacingError, + metrics::NUM_CONNECTION_ACCEPTED_BY_SNI, proxy::neon_options_str, }; use itertools::Itertools; use pq_proto::StartupMessageParams; diff --git a/proxy/src/compute.rs b/proxy/src/compute.rs index f5f7270bf4..a54ba56e43 100644 --- a/proxy/src/compute.rs +++ b/proxy/src/compute.rs @@ -1,9 +1,6 @@ use crate::{ - auth::parse_endpoint_param, - cancellation::CancelClosure, - console::errors::WakeComputeError, - error::UserFacingError, - proxy::{neon_option, NUM_DB_CONNECTIONS_GAUGE}, + auth::parse_endpoint_param, cancellation::CancelClosure, console::errors::WakeComputeError, + error::UserFacingError, metrics::NUM_DB_CONNECTIONS_GAUGE, proxy::neon_option, }; use futures::{FutureExt, TryFutureExt}; use itertools::Itertools; diff --git a/proxy/src/console/provider.rs b/proxy/src/console/provider.rs index deab966d9e..8d399f26ea 100644 --- a/proxy/src/console/provider.rs +++ b/proxy/src/console/provider.rs @@ -21,7 +21,7 @@ pub mod errors { use crate::{ error::{io_error, UserFacingError}, http, - proxy::ShouldRetry, + proxy::retry::ShouldRetry, }; use thiserror::Error; diff --git a/proxy/src/console/provider/neon.rs b/proxy/src/console/provider/neon.rs index 192252a0df..f748c9a41f 100644 --- a/proxy/src/console/provider/neon.rs +++ b/proxy/src/console/provider/neon.rs @@ -5,7 +5,7 @@ use super::{ errors::{ApiError, GetAuthInfoError, WakeComputeError}, ApiCaches, ApiLocks, AuthInfo, AuthSecret, CachedNodeInfo, ConsoleReqExtra, NodeInfo, }; -use crate::proxy::{ALLOWED_IPS_BY_CACHE_OUTCOME, ALLOWED_IPS_NUMBER}; +use crate::metrics::{ALLOWED_IPS_BY_CACHE_OUTCOME, ALLOWED_IPS_NUMBER}; use crate::{auth::backend::ComputeUserInfo, compute, http, scram}; use async_trait::async_trait; use futures::TryFutureExt; diff --git a/proxy/src/http.rs b/proxy/src/http.rs index 09423eca77..59e1492ed4 100644 --- a/proxy/src/http.rs +++ b/proxy/src/http.rs @@ -13,7 +13,7 @@ pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; use tokio::time::Instant; use tracing::trace; -use crate::{proxy::CONSOLE_REQUEST_LATENCY, rate_limiter, url::ApiUrl}; +use crate::{metrics::CONSOLE_REQUEST_LATENCY, rate_limiter, url::ApiUrl}; use reqwest_middleware::RequestBuilder; /// This is the preferred way to create new http clients, diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index a22600cbb3..2da1eaf482 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -16,6 +16,7 @@ pub mod console; pub mod error; pub mod http; pub mod logging; +pub mod metrics; pub mod parse; pub mod protocol2; pub mod proxy; diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs new file mode 100644 index 0000000000..8e2a6105b1 --- /dev/null +++ b/proxy/src/metrics.rs @@ -0,0 +1,232 @@ +use ::metrics::{ + exponential_buckets, register_int_counter_pair_vec, register_int_counter_vec, + IntCounterPairVec, IntCounterVec, +}; +use prometheus::{ + register_histogram, register_histogram_vec, register_int_gauge_vec, Histogram, HistogramVec, + IntGaugeVec, +}; + +use once_cell::sync::Lazy; +use tokio::time; + +pub static NUM_DB_CONNECTIONS_GAUGE: Lazy = Lazy::new(|| { + register_int_counter_pair_vec!( + "proxy_opened_db_connections_total", + "Number of opened connections to a database.", + "proxy_closed_db_connections_total", + "Number of closed connections to a database.", + &["protocol"], + ) + .unwrap() +}); + +pub static NUM_CLIENT_CONNECTION_GAUGE: Lazy = Lazy::new(|| { + register_int_counter_pair_vec!( + "proxy_opened_client_connections_total", + "Number of opened connections from a client.", + "proxy_closed_client_connections_total", + "Number of closed connections from a client.", + &["protocol"], + ) + .unwrap() +}); + +pub static NUM_CONNECTION_REQUESTS_GAUGE: Lazy = Lazy::new(|| { + register_int_counter_pair_vec!( + "proxy_accepted_connections_total", + "Number of client connections accepted.", + "proxy_closed_connections_total", + "Number of client connections closed.", + &["protocol"], + ) + .unwrap() +}); + +pub static COMPUTE_CONNECTION_LATENCY: Lazy = Lazy::new(|| { + register_histogram_vec!( + "proxy_compute_connection_latency_seconds", + "Time it took for proxy to establish a connection to the compute endpoint", + // http/ws/tcp, true/false, true/false, success/failure + // 3 * 2 * 2 * 2 = 24 counters + &["protocol", "cache_miss", "pool_miss", "outcome"], + // largest bucket = 2^16 * 0.5ms = 32s + exponential_buckets(0.0005, 2.0, 16).unwrap(), + ) + .unwrap() +}); + +pub static CONSOLE_REQUEST_LATENCY: Lazy = Lazy::new(|| { + register_histogram_vec!( + "proxy_console_request_latency", + "Time it took for proxy to establish a connection to the compute endpoint", + // proxy_wake_compute/proxy_get_role_info + &["request"], + // largest bucket = 2^16 * 0.2ms = 13s + exponential_buckets(0.0002, 2.0, 16).unwrap(), + ) + .unwrap() +}); + +pub static ALLOWED_IPS_BY_CACHE_OUTCOME: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "proxy_allowed_ips_cache_misses", + "Number of cache hits/misses for allowed ips", + // hit/miss + &["outcome"], + ) + .unwrap() +}); + +pub static RATE_LIMITER_ACQUIRE_LATENCY: Lazy = Lazy::new(|| { + register_histogram!( + "proxy_control_plane_token_acquire_seconds", + "Time it took for proxy to establish a connection to the compute endpoint", + // largest bucket = 3^16 * 0.05ms = 2.15s + exponential_buckets(0.00005, 3.0, 16).unwrap(), + ) + .unwrap() +}); + +pub static RATE_LIMITER_LIMIT: Lazy = Lazy::new(|| { + register_int_gauge_vec!( + "semaphore_control_plane_limit", + "Current limit of the semaphore control plane", + &["limit"], // 2 counters + ) + .unwrap() +}); + +pub static NUM_CONNECTION_ACCEPTED_BY_SNI: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "proxy_accepted_connections_by_sni", + "Number of connections (per sni).", + &["kind"], + ) + .unwrap() +}); + +pub static ALLOWED_IPS_NUMBER: Lazy = Lazy::new(|| { + register_histogram!( + "proxy_allowed_ips_number", + "Number of allowed ips", + vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0], + ) + .unwrap() +}); + +pub struct LatencyTimer { + // time since the stopwatch was started + start: Option, + // accumulated time on the stopwatch + accumulated: std::time::Duration, + // label data + protocol: &'static str, + cache_miss: bool, + pool_miss: bool, + outcome: &'static str, +} + +pub struct LatencyTimerPause<'a> { + timer: &'a mut LatencyTimer, +} + +impl LatencyTimer { + pub fn new(protocol: &'static str) -> Self { + Self { + start: Some(time::Instant::now()), + accumulated: std::time::Duration::ZERO, + protocol, + cache_miss: false, + // by default we don't do pooling + pool_miss: true, + // assume failed unless otherwise specified + outcome: "failed", + } + } + + pub fn pause(&mut self) -> LatencyTimerPause<'_> { + // stop the stopwatch and record the time that we have accumulated + let start = self.start.take().expect("latency timer should be started"); + self.accumulated += start.elapsed(); + LatencyTimerPause { timer: self } + } + + pub fn cache_miss(&mut self) { + self.cache_miss = true; + } + + pub fn pool_hit(&mut self) { + self.pool_miss = false; + } + + pub fn success(mut self) { + self.outcome = "success"; + } +} + +impl Drop for LatencyTimerPause<'_> { + fn drop(&mut self) { + // start the stopwatch again + self.timer.start = Some(time::Instant::now()); + } +} + +impl Drop for LatencyTimer { + fn drop(&mut self) { + let duration = + self.start.map(|start| start.elapsed()).unwrap_or_default() + self.accumulated; + COMPUTE_CONNECTION_LATENCY + .with_label_values(&[ + self.protocol, + bool_to_str(self.cache_miss), + bool_to_str(self.pool_miss), + self.outcome, + ]) + .observe(duration.as_secs_f64()) + } +} + +pub static NUM_CONNECTION_FAILURES: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "proxy_connection_failures_total", + "Number of connection failures (per kind).", + &["kind"], + ) + .unwrap() +}); + +pub static NUM_WAKEUP_FAILURES: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "proxy_connection_failures_breakdown", + "Number of wake-up failures (per kind).", + &["retry", "kind"], + ) + .unwrap() +}); + +pub static NUM_BYTES_PROXIED_PER_CLIENT_COUNTER: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "proxy_io_bytes_per_client", + "Number of bytes sent/received between client and backend.", + crate::console::messages::MetricsAuxInfo::TRAFFIC_LABELS, + ) + .unwrap() +}); + +pub static NUM_BYTES_PROXIED_COUNTER: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "proxy_io_bytes", + "Number of bytes sent/received between all clients and backends.", + &["direction"], + ) + .unwrap() +}); + +pub const fn bool_to_str(x: bool) -> &'static str { + if x { + "true" + } else { + "false" + } +} diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index da65065179..17e910860c 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -1,265 +1,41 @@ #[cfg(test)] mod tests; +pub mod connect_compute; +pub mod retry; + use crate::{ auth, cancellation::{self, CancelMap}, - compute::{self, PostgresConnection}, + compute, config::{AuthenticationConfig, ProxyConfig, TlsConfig}, - console::{self, errors::WakeComputeError, messages::MetricsAuxInfo, Api}, - http::StatusCode, + console::{self, messages::MetricsAuxInfo}, + metrics::{ + LatencyTimer, NUM_BYTES_PROXIED_COUNTER, NUM_BYTES_PROXIED_PER_CLIENT_COUNTER, + NUM_CLIENT_CONNECTION_GAUGE, NUM_CONNECTION_REQUESTS_GAUGE, + }, protocol2::WithClientIp, rate_limiter::EndpointRateLimiter, stream::{PqStream, Stream}, usage_metrics::{Ids, USAGE_METRICS}, }; use anyhow::{bail, Context}; -use async_trait::async_trait; use futures::TryFutureExt; use itertools::Itertools; -use metrics::{ - exponential_buckets, register_int_counter_pair_vec, register_int_counter_vec, - IntCounterPairVec, IntCounterVec, -}; -use once_cell::sync::{Lazy, OnceCell}; +use once_cell::sync::OnceCell; use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams}; -use prometheus::{ - register_histogram, register_histogram_vec, register_int_gauge_vec, Histogram, HistogramVec, - IntGaugeVec, -}; use regex::Regex; -use std::{error::Error, io, net::IpAddr, ops::ControlFlow, sync::Arc, time::Instant}; -use tokio::{ - io::{AsyncRead, AsyncWrite, AsyncWriteExt}, - time, -}; +use std::{net::IpAddr, sync::Arc}; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio_util::sync::CancellationToken; -use tracing::{error, info, info_span, warn, Instrument}; +use tracing::{error, info, info_span, Instrument}; use utils::measured_stream::MeasuredStream; -/// Number of times we should retry the `/proxy_wake_compute` http request. -/// Retry duration is BASE_RETRY_WAIT_DURATION * RETRY_WAIT_EXPONENT_BASE ^ n, where n starts at 0 -pub const NUM_RETRIES_CONNECT: u32 = 16; -const CONNECT_TIMEOUT: time::Duration = time::Duration::from_secs(2); -const BASE_RETRY_WAIT_DURATION: time::Duration = time::Duration::from_millis(25); -const RETRY_WAIT_EXPONENT_BASE: f64 = std::f64::consts::SQRT_2; +use self::connect_compute::{connect_to_compute, TcpMechanism}; const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)"; const ERR_PROTO_VIOLATION: &str = "protocol violation"; -pub static NUM_DB_CONNECTIONS_GAUGE: Lazy = Lazy::new(|| { - register_int_counter_pair_vec!( - "proxy_opened_db_connections_total", - "Number of opened connections to a database.", - "proxy_closed_db_connections_total", - "Number of closed connections to a database.", - &["protocol"], - ) - .unwrap() -}); - -pub static NUM_CLIENT_CONNECTION_GAUGE: Lazy = Lazy::new(|| { - register_int_counter_pair_vec!( - "proxy_opened_client_connections_total", - "Number of opened connections from a client.", - "proxy_closed_client_connections_total", - "Number of closed connections from a client.", - &["protocol"], - ) - .unwrap() -}); - -pub static NUM_CONNECTION_REQUESTS_GAUGE: Lazy = Lazy::new(|| { - register_int_counter_pair_vec!( - "proxy_accepted_connections_total", - "Number of client connections accepted.", - "proxy_closed_connections_total", - "Number of client connections closed.", - &["protocol"], - ) - .unwrap() -}); - -static COMPUTE_CONNECTION_LATENCY: Lazy = Lazy::new(|| { - register_histogram_vec!( - "proxy_compute_connection_latency_seconds", - "Time it took for proxy to establish a connection to the compute endpoint", - // http/ws/tcp, true/false, true/false, success/failure - // 3 * 2 * 2 * 2 = 24 counters - &["protocol", "cache_miss", "pool_miss", "outcome"], - // largest bucket = 2^16 * 0.5ms = 32s - exponential_buckets(0.0005, 2.0, 16).unwrap(), - ) - .unwrap() -}); - -pub static CONSOLE_REQUEST_LATENCY: Lazy = Lazy::new(|| { - register_histogram_vec!( - "proxy_console_request_latency", - "Time it took for proxy to establish a connection to the compute endpoint", - // proxy_wake_compute/proxy_get_role_info - &["request"], - // largest bucket = 2^16 * 0.2ms = 13s - exponential_buckets(0.0002, 2.0, 16).unwrap(), - ) - .unwrap() -}); - -pub static ALLOWED_IPS_BY_CACHE_OUTCOME: Lazy = Lazy::new(|| { - register_int_counter_vec!( - "proxy_allowed_ips_cache_misses", - "Number of cache hits/misses for allowed ips", - // hit/miss - &["outcome"], - ) - .unwrap() -}); - -pub static RATE_LIMITER_ACQUIRE_LATENCY: Lazy = Lazy::new(|| { - register_histogram!( - "proxy_control_plane_token_acquire_seconds", - "Time it took for proxy to establish a connection to the compute endpoint", - // largest bucket = 3^16 * 0.05ms = 2.15s - exponential_buckets(0.00005, 3.0, 16).unwrap(), - ) - .unwrap() -}); - -pub static RATE_LIMITER_LIMIT: Lazy = Lazy::new(|| { - register_int_gauge_vec!( - "semaphore_control_plane_limit", - "Current limit of the semaphore control plane", - &["limit"], // 2 counters - ) - .unwrap() -}); - -pub static NUM_CONNECTION_ACCEPTED_BY_SNI: Lazy = Lazy::new(|| { - register_int_counter_vec!( - "proxy_accepted_connections_by_sni", - "Number of connections (per sni).", - &["kind"], - ) - .unwrap() -}); - -pub static ALLOWED_IPS_NUMBER: Lazy = Lazy::new(|| { - register_histogram!( - "proxy_allowed_ips_number", - "Number of allowed ips", - vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0], - ) - .unwrap() -}); - -pub struct LatencyTimer { - // time since the stopwatch was started - start: Option, - // accumulated time on the stopwatch - accumulated: std::time::Duration, - // label data - protocol: &'static str, - cache_miss: bool, - pool_miss: bool, - outcome: &'static str, -} - -pub struct LatencyTimerPause<'a> { - timer: &'a mut LatencyTimer, -} - -impl LatencyTimer { - pub fn new(protocol: &'static str) -> Self { - Self { - start: Some(Instant::now()), - accumulated: std::time::Duration::ZERO, - protocol, - cache_miss: false, - // by default we don't do pooling - pool_miss: true, - // assume failed unless otherwise specified - outcome: "failed", - } - } - - pub fn pause(&mut self) -> LatencyTimerPause<'_> { - // stop the stopwatch and record the time that we have accumulated - let start = self.start.take().expect("latency timer should be started"); - self.accumulated += start.elapsed(); - LatencyTimerPause { timer: self } - } - - pub fn cache_miss(&mut self) { - self.cache_miss = true; - } - - pub fn pool_hit(&mut self) { - self.pool_miss = false; - } - - pub fn success(mut self) { - self.outcome = "success"; - } -} - -impl Drop for LatencyTimerPause<'_> { - fn drop(&mut self) { - // start the stopwatch again - self.timer.start = Some(Instant::now()); - } -} - -impl Drop for LatencyTimer { - fn drop(&mut self) { - let duration = - self.start.map(|start| start.elapsed()).unwrap_or_default() + self.accumulated; - COMPUTE_CONNECTION_LATENCY - .with_label_values(&[ - self.protocol, - bool_to_str(self.cache_miss), - bool_to_str(self.pool_miss), - self.outcome, - ]) - .observe(duration.as_secs_f64()) - } -} - -static NUM_CONNECTION_FAILURES: Lazy = Lazy::new(|| { - register_int_counter_vec!( - "proxy_connection_failures_total", - "Number of connection failures (per kind).", - &["kind"], - ) - .unwrap() -}); - -static NUM_WAKEUP_FAILURES: Lazy = Lazy::new(|| { - register_int_counter_vec!( - "proxy_connection_failures_breakdown", - "Number of wake-up failures (per kind).", - &["retry", "kind"], - ) - .unwrap() -}); - -static NUM_BYTES_PROXIED_PER_CLIENT_COUNTER: Lazy = Lazy::new(|| { - register_int_counter_vec!( - "proxy_io_bytes_per_client", - "Number of bytes sent/received between client and backend.", - crate::console::messages::MetricsAuxInfo::TRAFFIC_LABELS, - ) - .unwrap() -}); - -static NUM_BYTES_PROXIED_COUNTER: Lazy = Lazy::new(|| { - register_int_counter_vec!( - "proxy_io_bytes", - "Number of bytes sent/received between all clients and backends.", - &["direction"], - ) - .unwrap() -}); - pub async fn run_until_cancelled( f: F, cancellation_token: &CancellationToken, @@ -539,296 +315,6 @@ async fn handshake( } } -/// If we couldn't connect, a cached connection info might be to blame -/// (e.g. the compute node's address might've changed at the wrong time). -/// Invalidate the cache entry (if any) to prevent subsequent errors. -#[tracing::instrument(name = "invalidate_cache", skip_all)] -pub fn invalidate_cache(node_info: console::CachedNodeInfo) -> compute::ConnCfg { - let is_cached = node_info.cached(); - if is_cached { - warn!("invalidating stalled compute node info cache entry"); - } - let label = match is_cached { - true => "compute_cached", - false => "compute_uncached", - }; - NUM_CONNECTION_FAILURES.with_label_values(&[label]).inc(); - - node_info.invalidate().config -} - -/// Try to connect to the compute node once. -#[tracing::instrument(name = "connect_once", fields(pid = tracing::field::Empty), skip_all)] -async fn connect_to_compute_once( - node_info: &console::CachedNodeInfo, - timeout: time::Duration, - proto: &'static str, -) -> Result { - let allow_self_signed_compute = node_info.allow_self_signed_compute; - - node_info - .config - .connect(allow_self_signed_compute, timeout, proto) - .await -} - -#[async_trait] -pub trait ConnectMechanism { - type Connection; - type ConnectError; - type Error: From; - async fn connect_once( - &self, - node_info: &console::CachedNodeInfo, - timeout: time::Duration, - ) -> Result; - - fn update_connect_config(&self, conf: &mut compute::ConnCfg); -} - -pub struct TcpMechanism<'a> { - /// KV-dictionary with PostgreSQL connection params. - pub params: &'a StartupMessageParams, - pub proto: &'static str, -} - -#[async_trait] -impl ConnectMechanism for TcpMechanism<'_> { - type Connection = PostgresConnection; - type ConnectError = compute::ConnectionError; - type Error = compute::ConnectionError; - - async fn connect_once( - &self, - node_info: &console::CachedNodeInfo, - timeout: time::Duration, - ) -> Result { - connect_to_compute_once(node_info, timeout, self.proto).await - } - - fn update_connect_config(&self, config: &mut compute::ConnCfg) { - config.set_startup_params(self.params); - } -} - -const fn bool_to_str(x: bool) -> &'static str { - if x { - "true" - } else { - "false" - } -} - -fn report_error(e: &WakeComputeError, retry: bool) { - use crate::console::errors::ApiError; - let retry = bool_to_str(retry); - let kind = match e { - WakeComputeError::BadComputeAddress(_) => "bad_compute_address", - WakeComputeError::ApiError(ApiError::Transport(_)) => "api_transport_error", - WakeComputeError::ApiError(ApiError::Console { - status: StatusCode::LOCKED, - ref text, - }) if text.contains("written data quota exceeded") - || text.contains("the limit for current plan reached") => - { - "quota_exceeded" - } - WakeComputeError::ApiError(ApiError::Console { - status: StatusCode::LOCKED, - .. - }) => "api_console_locked", - WakeComputeError::ApiError(ApiError::Console { - status: StatusCode::BAD_REQUEST, - .. - }) => "api_console_bad_request", - WakeComputeError::ApiError(ApiError::Console { status, .. }) - if status.is_server_error() => - { - "api_console_other_server_error" - } - WakeComputeError::ApiError(ApiError::Console { .. }) => "api_console_other_error", - WakeComputeError::TimeoutError => "timeout_error", - }; - NUM_WAKEUP_FAILURES.with_label_values(&[retry, kind]).inc(); -} - -/// Try to connect to the compute node, retrying if necessary. -/// This function might update `node_info`, so we take it by `&mut`. -#[tracing::instrument(skip_all)] -pub async fn connect_to_compute( - mechanism: &M, - mut node_info: console::CachedNodeInfo, - extra: &console::ConsoleReqExtra, - creds: &auth::BackendType<'_, auth::backend::ComputeUserInfo>, - mut latency_timer: LatencyTimer, -) -> Result -where - M::ConnectError: ShouldRetry + std::fmt::Debug, - M::Error: From, -{ - mechanism.update_connect_config(&mut node_info.config); - - // try once - let (config, err) = match mechanism.connect_once(&node_info, CONNECT_TIMEOUT).await { - Ok(res) => { - latency_timer.success(); - return Ok(res); - } - Err(e) => { - error!(error = ?e, "could not connect to compute node"); - (invalidate_cache(node_info), e) - } - }; - - latency_timer.cache_miss(); - - let mut num_retries = 1; - - // if we failed to connect, it's likely that the compute node was suspended, wake a new compute node - info!("compute node's state has likely changed; requesting a wake-up"); - let node_info = loop { - let wake_res = match creds { - auth::BackendType::Console(api, creds) => api.wake_compute(extra, creds).await, - #[cfg(feature = "testing")] - auth::BackendType::Postgres(api, creds) => api.wake_compute(extra, creds).await, - // nothing to do? - auth::BackendType::Link(_) => return Err(err.into()), - // test backend - #[cfg(test)] - auth::BackendType::Test(x) => x.wake_compute(), - }; - - match handle_try_wake(wake_res, num_retries) { - Err(e) => { - error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node"); - report_error(&e, false); - return Err(e.into()); - } - // failed to wake up but we can continue to retry - Ok(ControlFlow::Continue(e)) => { - report_error(&e, true); - warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node"); - } - // successfully woke up a compute node and can break the wakeup loop - Ok(ControlFlow::Break(mut node_info)) => { - node_info.config.reuse_password(&config); - mechanism.update_connect_config(&mut node_info.config); - break node_info; - } - } - - let wait_duration = retry_after(num_retries); - num_retries += 1; - - time::sleep(wait_duration).await; - }; - - // now that we have a new node, try connect to it repeatedly. - // this can error for a few reasons, for instance: - // * DNS connection settings haven't quite propagated yet - info!("wake_compute success. attempting to connect"); - loop { - match mechanism.connect_once(&node_info, CONNECT_TIMEOUT).await { - Ok(res) => { - latency_timer.success(); - return Ok(res); - } - Err(e) => { - let retriable = e.should_retry(num_retries); - if !retriable { - error!(error = ?e, num_retries, retriable, "couldn't connect to compute node"); - return Err(e.into()); - } - warn!(error = ?e, num_retries, retriable, "couldn't connect to compute node"); - } - } - - let wait_duration = retry_after(num_retries); - num_retries += 1; - - time::sleep(wait_duration).await; - } -} - -/// Attempts to wake up the compute node. -/// * Returns Ok(Continue(e)) if there was an error waking but retries are acceptable -/// * Returns Ok(Break(node)) if the wakeup succeeded -/// * Returns Err(e) if there was an error -pub fn handle_try_wake( - result: Result, - num_retries: u32, -) -> Result, WakeComputeError> { - match result { - Err(err) => match &err { - WakeComputeError::ApiError(api) if api.should_retry(num_retries) => { - Ok(ControlFlow::Continue(err)) - } - _ => Err(err), - }, - // Ready to try again. - Ok(new) => Ok(ControlFlow::Break(new)), - } -} - -pub trait ShouldRetry { - fn could_retry(&self) -> bool; - fn should_retry(&self, num_retries: u32) -> bool { - match self { - _ if num_retries >= NUM_RETRIES_CONNECT => false, - err => err.could_retry(), - } - } -} - -impl ShouldRetry for io::Error { - fn could_retry(&self) -> bool { - use std::io::ErrorKind; - matches!( - self.kind(), - ErrorKind::ConnectionRefused | ErrorKind::AddrNotAvailable | ErrorKind::TimedOut - ) - } -} - -impl ShouldRetry for tokio_postgres::error::DbError { - fn could_retry(&self) -> bool { - use tokio_postgres::error::SqlState; - matches!( - self.code(), - &SqlState::CONNECTION_FAILURE - | &SqlState::CONNECTION_EXCEPTION - | &SqlState::CONNECTION_DOES_NOT_EXIST - | &SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION, - ) - } -} - -impl ShouldRetry for tokio_postgres::Error { - fn could_retry(&self) -> bool { - if let Some(io_err) = self.source().and_then(|x| x.downcast_ref()) { - io::Error::could_retry(io_err) - } else if let Some(db_err) = self.source().and_then(|x| x.downcast_ref()) { - tokio_postgres::error::DbError::could_retry(db_err) - } else { - false - } - } -} - -impl ShouldRetry for compute::ConnectionError { - fn could_retry(&self) -> bool { - match self { - compute::ConnectionError::Postgres(err) => err.could_retry(), - compute::ConnectionError::CouldNotConnect(err) => err.could_retry(), - _ => false, - } - } -} - -pub fn retry_after(num_retries: u32) -> time::Duration { - BASE_RETRY_WAIT_DURATION.mul_f64(RETRY_WAIT_EXPONENT_BASE.powi((num_retries as i32) - 1)) -} - /// Finish client connection initialization: confirm auth success, send params, etc. #[tracing::instrument(skip_all)] async fn prepare_client_connection( diff --git a/proxy/src/proxy/connect_compute.rs b/proxy/src/proxy/connect_compute.rs new file mode 100644 index 0000000000..88b0019c49 --- /dev/null +++ b/proxy/src/proxy/connect_compute.rs @@ -0,0 +1,238 @@ +use crate::{ + auth, + compute::{self, PostgresConnection}, + console::{self, errors::WakeComputeError, Api}, + metrics::{bool_to_str, LatencyTimer, NUM_CONNECTION_FAILURES, NUM_WAKEUP_FAILURES}, + proxy::retry::{retry_after, ShouldRetry}, +}; +use async_trait::async_trait; +use hyper::StatusCode; +use pq_proto::StartupMessageParams; +use std::ops::ControlFlow; +use tokio::time; +use tracing::{error, info, warn}; + +const CONNECT_TIMEOUT: time::Duration = time::Duration::from_secs(2); + +/// If we couldn't connect, a cached connection info might be to blame +/// (e.g. the compute node's address might've changed at the wrong time). +/// Invalidate the cache entry (if any) to prevent subsequent errors. +#[tracing::instrument(name = "invalidate_cache", skip_all)] +pub fn invalidate_cache(node_info: console::CachedNodeInfo) -> compute::ConnCfg { + let is_cached = node_info.cached(); + if is_cached { + warn!("invalidating stalled compute node info cache entry"); + } + let label = match is_cached { + true => "compute_cached", + false => "compute_uncached", + }; + NUM_CONNECTION_FAILURES.with_label_values(&[label]).inc(); + + node_info.invalidate().config +} + +/// Try to connect to the compute node once. +#[tracing::instrument(name = "connect_once", fields(pid = tracing::field::Empty), skip_all)] +async fn connect_to_compute_once( + node_info: &console::CachedNodeInfo, + timeout: time::Duration, + proto: &'static str, +) -> Result { + let allow_self_signed_compute = node_info.allow_self_signed_compute; + + node_info + .config + .connect(allow_self_signed_compute, timeout, proto) + .await +} + +#[async_trait] +pub trait ConnectMechanism { + type Connection; + type ConnectError; + type Error: From; + async fn connect_once( + &self, + node_info: &console::CachedNodeInfo, + timeout: time::Duration, + ) -> Result; + + fn update_connect_config(&self, conf: &mut compute::ConnCfg); +} + +pub struct TcpMechanism<'a> { + /// KV-dictionary with PostgreSQL connection params. + pub params: &'a StartupMessageParams, + pub proto: &'static str, +} + +#[async_trait] +impl ConnectMechanism for TcpMechanism<'_> { + type Connection = PostgresConnection; + type ConnectError = compute::ConnectionError; + type Error = compute::ConnectionError; + + async fn connect_once( + &self, + node_info: &console::CachedNodeInfo, + timeout: time::Duration, + ) -> Result { + connect_to_compute_once(node_info, timeout, self.proto).await + } + + fn update_connect_config(&self, config: &mut compute::ConnCfg) { + config.set_startup_params(self.params); + } +} + +fn report_error(e: &WakeComputeError, retry: bool) { + use crate::console::errors::ApiError; + let retry = bool_to_str(retry); + let kind = match e { + WakeComputeError::BadComputeAddress(_) => "bad_compute_address", + WakeComputeError::ApiError(ApiError::Transport(_)) => "api_transport_error", + WakeComputeError::ApiError(ApiError::Console { + status: StatusCode::LOCKED, + ref text, + }) if text.contains("written data quota exceeded") + || text.contains("the limit for current plan reached") => + { + "quota_exceeded" + } + WakeComputeError::ApiError(ApiError::Console { + status: StatusCode::LOCKED, + .. + }) => "api_console_locked", + WakeComputeError::ApiError(ApiError::Console { + status: StatusCode::BAD_REQUEST, + .. + }) => "api_console_bad_request", + WakeComputeError::ApiError(ApiError::Console { status, .. }) + if status.is_server_error() => + { + "api_console_other_server_error" + } + WakeComputeError::ApiError(ApiError::Console { .. }) => "api_console_other_error", + WakeComputeError::TimeoutError => "timeout_error", + }; + NUM_WAKEUP_FAILURES.with_label_values(&[retry, kind]).inc(); +} + +/// Try to connect to the compute node, retrying if necessary. +/// This function might update `node_info`, so we take it by `&mut`. +#[tracing::instrument(skip_all)] +pub async fn connect_to_compute( + mechanism: &M, + mut node_info: console::CachedNodeInfo, + extra: &console::ConsoleReqExtra, + creds: &auth::BackendType<'_, auth::backend::ComputeUserInfo>, + mut latency_timer: LatencyTimer, +) -> Result +where + M::ConnectError: ShouldRetry + std::fmt::Debug, + M::Error: From, +{ + mechanism.update_connect_config(&mut node_info.config); + + // try once + let (config, err) = match mechanism.connect_once(&node_info, CONNECT_TIMEOUT).await { + Ok(res) => { + latency_timer.success(); + return Ok(res); + } + Err(e) => { + error!(error = ?e, "could not connect to compute node"); + (invalidate_cache(node_info), e) + } + }; + + latency_timer.cache_miss(); + + let mut num_retries = 1; + + // if we failed to connect, it's likely that the compute node was suspended, wake a new compute node + info!("compute node's state has likely changed; requesting a wake-up"); + let node_info = loop { + let wake_res = match creds { + auth::BackendType::Console(api, creds) => api.wake_compute(extra, creds).await, + #[cfg(feature = "testing")] + auth::BackendType::Postgres(api, creds) => api.wake_compute(extra, creds).await, + // nothing to do? + auth::BackendType::Link(_) => return Err(err.into()), + // test backend + #[cfg(test)] + auth::BackendType::Test(x) => x.wake_compute(), + }; + + match handle_try_wake(wake_res, num_retries) { + Err(e) => { + error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node"); + report_error(&e, false); + return Err(e.into()); + } + // failed to wake up but we can continue to retry + Ok(ControlFlow::Continue(e)) => { + report_error(&e, true); + warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node"); + } + // successfully woke up a compute node and can break the wakeup loop + Ok(ControlFlow::Break(mut node_info)) => { + node_info.config.reuse_password(&config); + mechanism.update_connect_config(&mut node_info.config); + break node_info; + } + } + + let wait_duration = retry_after(num_retries); + num_retries += 1; + + time::sleep(wait_duration).await; + }; + + // now that we have a new node, try connect to it repeatedly. + // this can error for a few reasons, for instance: + // * DNS connection settings haven't quite propagated yet + info!("wake_compute success. attempting to connect"); + loop { + match mechanism.connect_once(&node_info, CONNECT_TIMEOUT).await { + Ok(res) => { + latency_timer.success(); + return Ok(res); + } + Err(e) => { + let retriable = e.should_retry(num_retries); + if !retriable { + error!(error = ?e, num_retries, retriable, "couldn't connect to compute node"); + return Err(e.into()); + } + warn!(error = ?e, num_retries, retriable, "couldn't connect to compute node"); + } + } + + let wait_duration = retry_after(num_retries); + num_retries += 1; + + time::sleep(wait_duration).await; + } +} + +/// Attempts to wake up the compute node. +/// * Returns Ok(Continue(e)) if there was an error waking but retries are acceptable +/// * Returns Ok(Break(node)) if the wakeup succeeded +/// * Returns Err(e) if there was an error +pub fn handle_try_wake( + result: Result, + num_retries: u32, +) -> Result, WakeComputeError> { + match result { + Err(err) => match &err { + WakeComputeError::ApiError(api) if api.should_retry(num_retries) => { + Ok(ControlFlow::Continue(err)) + } + _ => Err(err), + }, + // Ready to try again. + Ok(new) => Ok(ControlFlow::Break(new)), + } +} diff --git a/proxy/src/proxy/retry.rs b/proxy/src/proxy/retry.rs new file mode 100644 index 0000000000..a85ed380b0 --- /dev/null +++ b/proxy/src/proxy/retry.rs @@ -0,0 +1,68 @@ +use crate::compute; +use std::{error::Error, io}; +use tokio::time; + +/// Number of times we should retry the `/proxy_wake_compute` http request. +/// Retry duration is BASE_RETRY_WAIT_DURATION * RETRY_WAIT_EXPONENT_BASE ^ n, where n starts at 0 +pub const NUM_RETRIES_CONNECT: u32 = 16; +const BASE_RETRY_WAIT_DURATION: time::Duration = time::Duration::from_millis(25); +const RETRY_WAIT_EXPONENT_BASE: f64 = std::f64::consts::SQRT_2; + +pub trait ShouldRetry { + fn could_retry(&self) -> bool; + fn should_retry(&self, num_retries: u32) -> bool { + match self { + _ if num_retries >= NUM_RETRIES_CONNECT => false, + err => err.could_retry(), + } + } +} + +impl ShouldRetry for io::Error { + fn could_retry(&self) -> bool { + use std::io::ErrorKind; + matches!( + self.kind(), + ErrorKind::ConnectionRefused | ErrorKind::AddrNotAvailable | ErrorKind::TimedOut + ) + } +} + +impl ShouldRetry for tokio_postgres::error::DbError { + fn could_retry(&self) -> bool { + use tokio_postgres::error::SqlState; + matches!( + self.code(), + &SqlState::CONNECTION_FAILURE + | &SqlState::CONNECTION_EXCEPTION + | &SqlState::CONNECTION_DOES_NOT_EXIST + | &SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION, + ) + } +} + +impl ShouldRetry for tokio_postgres::Error { + fn could_retry(&self) -> bool { + if let Some(io_err) = self.source().and_then(|x| x.downcast_ref()) { + io::Error::could_retry(io_err) + } else if let Some(db_err) = self.source().and_then(|x| x.downcast_ref()) { + tokio_postgres::error::DbError::could_retry(db_err) + } else { + false + } + } +} + +impl ShouldRetry for compute::ConnectionError { + fn could_retry(&self) -> bool { + match self { + compute::ConnectionError::Postgres(err) => err.could_retry(), + compute::ConnectionError::CouldNotConnect(err) => err.could_retry(), + _ => false, + } + } +} + +pub fn retry_after(num_retries: u32) -> time::Duration { + BASE_RETRY_WAIT_DURATION.mul_f64(RETRY_WAIT_EXPONENT_BASE.powi((num_retries as i32) - 1)) +} diff --git a/proxy/src/proxy/tests.rs b/proxy/src/proxy/tests.rs index 4691abbfb9..3c483c59ee 100644 --- a/proxy/src/proxy/tests.rs +++ b/proxy/src/proxy/tests.rs @@ -2,10 +2,13 @@ mod mitm; +use super::connect_compute::ConnectMechanism; +use super::retry::ShouldRetry; use super::*; use crate::auth::backend::{ComputeUserInfo, TestBackend}; use crate::config::CertResolver; use crate::console::{CachedNodeInfo, NodeInfo}; +use crate::proxy::retry::{retry_after, NUM_RETRIES_CONNECT}; use crate::{auth, http, sasl, scram}; use async_trait::async_trait; use rstest::rstest; @@ -423,7 +426,7 @@ impl ConnectMechanism for TestConnectMechanism { async fn connect_once( &self, _node_info: &console::CachedNodeInfo, - _timeout: time::Duration, + _timeout: std::time::Duration, ) -> Result { let mut counter = self.counter.lock().unwrap(); let action = self.sequence[*counter]; diff --git a/proxy/src/proxy/tests/mitm.rs b/proxy/src/proxy/tests/mitm.rs index 50b3034936..a0a84a1dc0 100644 --- a/proxy/src/proxy/tests/mitm.rs +++ b/proxy/src/proxy/tests/mitm.rs @@ -120,7 +120,7 @@ where struct PgFrame; impl Decoder for PgFrame { type Item = Bytes; - type Error = io::Error; + type Error = std::io::Error; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { if src.len() < 5 { @@ -136,7 +136,7 @@ impl Decoder for PgFrame { } } impl Encoder for PgFrame { - type Error = io::Error; + type Error = std::io::Error; fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> { dst.extend_from_slice(&item); diff --git a/proxy/src/rate_limiter/aimd.rs b/proxy/src/rate_limiter/aimd.rs index c6c532ae53..2c14a54a6c 100644 --- a/proxy/src/rate_limiter/aimd.rs +++ b/proxy/src/rate_limiter/aimd.rs @@ -33,39 +33,6 @@ impl Aimd { min_utilisation_threshold: config.aimd_min_utilisation_threshold, } } - - pub fn decrease_factor(self, factor: f32) -> Self { - assert!((0.5..1.0).contains(&factor)); - Self { - decrease_factor: factor, - ..self - } - } - - pub fn increase_by(self, increase: usize) -> Self { - assert!(increase > 0); - Self { - increase_by: increase, - ..self - } - } - - pub fn with_max_limit(self, max: usize) -> Self { - assert!(max > 0); - Self { - max_limit: max, - ..self - } - } - - /// A threshold below which the limit won't be increased. 0.5 = 50%. - pub fn with_min_utilisation_threshold(self, min_util: f32) -> Self { - assert!(min_util > 0. && min_util < 1.); - Self { - min_utilisation_threshold: min_util, - ..self - } - } } #[async_trait] diff --git a/proxy/src/rate_limiter/limiter.rs b/proxy/src/rate_limiter/limiter.rs index 87c1597ca9..a190b2cf8f 100644 --- a/proxy/src/rate_limiter/limiter.rs +++ b/proxy/src/rate_limiter/limiter.rs @@ -1,12 +1,16 @@ -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, +use std::{ + collections::hash_map::RandomState, + hash::BuildHasher, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }, }; use anyhow::bail; use dashmap::DashMap; use itertools::Itertools; -use rand::{thread_rng, Rng}; +use rand::{rngs::StdRng, Rng, SeedableRng}; use smol_str::SmolStr; use tokio::sync::{Mutex as AsyncMutex, Semaphore, SemaphorePermit}; use tokio::time::{timeout, Duration, Instant}; @@ -28,10 +32,11 @@ use super::{ // saw SNI, before doing TLS handshake. User-side error messages in that case // does not look very nice (`SSL SYSCALL error: Undefined error: 0`), so for now // I went with a more expensive way that yields user-friendlier error messages. -pub struct EndpointRateLimiter { - map: DashMap>, +pub struct EndpointRateLimiter { + map: DashMap, Hasher>, info: &'static [RateBucketInfo], access_count: AtomicUsize, + rand: Mutex, } #[derive(Clone, Copy)] @@ -125,11 +130,18 @@ impl RateBucketInfo { impl EndpointRateLimiter { pub fn new(info: &'static [RateBucketInfo]) -> Self { + Self::new_with_rand_and_hasher(info, StdRng::from_entropy(), RandomState::new()) + } +} + +impl EndpointRateLimiter { + fn new_with_rand_and_hasher(info: &'static [RateBucketInfo], rand: R, hasher: S) -> Self { info!(buckets = ?info, "endpoint rate limiter"); Self { info, - map: DashMap::with_shard_amount(64), + map: DashMap::with_hasher_and_shard_amount(hasher, 64), access_count: AtomicUsize::new(1), // start from 1 to avoid GC on the first request + rand: Mutex::new(rand), } } @@ -176,7 +188,9 @@ impl EndpointRateLimiter { self.map.len() ); let n = self.map.shards().len(); - let shard = thread_rng().gen_range(0..n); + // this lock is ok as the periodic cycle of do_gc makes this very unlikely to collide + // (impossible, infact, unless we have 2048 threads) + let shard = self.rand.lock().unwrap().gen_range(0..n); self.map.shards()[shard].write().clear(); } } @@ -219,7 +233,6 @@ pub struct Token<'t> { #[derive(Debug, Clone, Copy)] pub struct LimiterState { limit: usize, - available: usize, in_flight: usize, } @@ -380,10 +393,10 @@ impl Limiter { } new_limit }; - crate::proxy::RATE_LIMITER_LIMIT + crate::metrics::RATE_LIMITER_LIMIT .with_label_values(&["expected"]) .set(new_limit as i64); - crate::proxy::RATE_LIMITER_LIMIT + crate::metrics::RATE_LIMITER_LIMIT .with_label_values(&["actual"]) .set(actual_limit as i64); self.limits.store(new_limit, Ordering::Release); @@ -397,11 +410,7 @@ impl Limiter { pub fn state(&self) -> LimiterState { let limit = self.limits.load(Ordering::Relaxed); let in_flight = self.in_flight.load(Ordering::Relaxed); - LimiterState { - limit, - available: limit.saturating_sub(in_flight), - in_flight, - } + LimiterState { limit, in_flight } } } @@ -414,13 +423,6 @@ impl<'t> Token<'t> { } } - #[cfg(test)] - pub fn set_latency(&mut self, latency: Duration) { - use std::ops::Sub; - - self.start = Instant::now().sub(latency); - } - pub fn forget(&mut self) { if let Some(permit) = self.permit.take() { permit.forget(); @@ -439,10 +441,6 @@ impl LimiterState { pub fn limit(&self) -> usize { self.limit } - /// The amount of concurrency available to use. - pub fn available(&self) -> usize { - self.available - } /// The number of jobs in flight. pub fn in_flight(&self) -> usize { self.in_flight @@ -472,7 +470,7 @@ impl reqwest_middleware::Middleware for Limiter { ) })?; info!(duration = ?start.elapsed(), "waiting for token to connect to the control plane"); - crate::proxy::RATE_LIMITER_ACQUIRE_LATENCY.observe(start.elapsed().as_secs_f64()); + crate::metrics::RATE_LIMITER_ACQUIRE_LATENCY.observe(start.elapsed().as_secs_f64()); match next.run(req, extensions).await { Ok(response) => { self.release(token, Some(Outcome::from_reqwest_response(&response))) @@ -490,9 +488,11 @@ impl reqwest_middleware::Middleware for Limiter { #[cfg(test)] mod tests { - use std::{pin::pin, task::Context, time::Duration}; + use std::{hash::BuildHasherDefault, pin::pin, task::Context, time::Duration}; use futures::{task::noop_waker_ref, Future}; + use rand::SeedableRng; + use rustc_hash::FxHasher; use smol_str::SmolStr; use tokio::time; @@ -690,4 +690,21 @@ mod tests { assert!(limiter.check(endpoint.clone())); } } + + #[tokio::test] + async fn test_rate_limits_gc() { + // fixed seeded random/hasher to ensure that the test is not flaky + let rand = rand::rngs::StdRng::from_seed([1; 32]); + let hasher = BuildHasherDefault::::default(); + + let limiter = EndpointRateLimiter::new_with_rand_and_hasher( + &RateBucketInfo::DEFAULT_SET, + rand, + hasher, + ); + for i in 0..1_000_000 { + limiter.check(format!("{i}").into()); + } + assert!(limiter.map.len() < 150_000); + } } diff --git a/proxy/src/serverless.rs b/proxy/src/serverless.rs index 870e9c1103..e358a0712f 100644 --- a/proxy/src/serverless.rs +++ b/proxy/src/serverless.rs @@ -13,8 +13,8 @@ pub use reqwest_middleware::{ClientWithMiddleware, Error}; pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; use tokio_util::task::TaskTracker; +use crate::metrics::NUM_CLIENT_CONNECTION_GAUGE; use crate::protocol2::{ProxyProtocolAccept, WithClientIp}; -use crate::proxy::NUM_CLIENT_CONNECTION_GAUGE; use crate::rate_limiter::EndpointRateLimiter; use crate::{cancellation::CancelMap, config::ProxyConfig}; use futures::StreamExt; diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index 69198d79d3..ab8903418b 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -24,13 +24,12 @@ use tokio_postgres::{AsyncMessage, ReadyForQueryStatus}; use crate::{ auth::{self, backend::ComputeUserInfo, check_peer_addr_is_in_list}, console, - proxy::{neon_options, LatencyTimer, NUM_DB_CONNECTIONS_GAUGE}, + metrics::{LatencyTimer, NUM_DB_CONNECTIONS_GAUGE}, + proxy::{connect_compute::ConnectMechanism, neon_options}, usage_metrics::{Ids, MetricCounter, USAGE_METRICS}, }; use crate::{compute, config}; -use crate::proxy::ConnectMechanism; - use tracing::{error, warn, Span}; use tracing::{info, info_span, Instrument}; @@ -444,7 +443,7 @@ async fn connect_to_compute( .await? .context("missing cache entry from wake_compute")?; - crate::proxy::connect_to_compute( + crate::proxy::connect_compute::connect_to_compute( &TokioMechanism { conn_id, conn_info, diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index 795ba819c1..307b085ce0 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -29,7 +29,7 @@ use utils::http::error::ApiError; use utils::http::json::json_response; use crate::config::HttpConfig; -use crate::proxy::NUM_CONNECTION_REQUESTS_GAUGE; +use crate::metrics::NUM_CONNECTION_REQUESTS_GAUGE; use super::conn_pool::ConnInfo; use super::conn_pool::GlobalConnPool; diff --git a/proxy/src/serverless/websocket.rs b/proxy/src/serverless/websocket.rs index cd6184cdee..071add3bca 100644 --- a/proxy/src/serverless/websocket.rs +++ b/proxy/src/serverless/websocket.rs @@ -27,15 +27,15 @@ use sync_wrapper::SyncWrapper; pin_project! { /// This is a wrapper around a [`WebSocketStream`] that /// implements [`AsyncRead`] and [`AsyncWrite`]. - pub struct WebSocketRw { + pub struct WebSocketRw { #[pin] - stream: SyncWrapper>, + stream: SyncWrapper>, bytes: Bytes, } } -impl WebSocketRw { - pub fn new(stream: WebSocketStream) -> Self { +impl WebSocketRw { + pub fn new(stream: WebSocketStream) -> Self { Self { stream: stream.into(), bytes: Bytes::new(), @@ -43,7 +43,7 @@ impl WebSocketRw { } } -impl AsyncWrite for WebSocketRw { +impl AsyncWrite for WebSocketRw { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -69,7 +69,7 @@ impl AsyncWrite for WebSocketRw { } } -impl AsyncRead for WebSocketRw { +impl AsyncRead for WebSocketRw { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -86,7 +86,7 @@ impl AsyncRead for WebSocketRw { } } -impl AsyncBufRead for WebSocketRw { +impl AsyncBufRead for WebSocketRw { fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // Please refer to poll_fill_buf's documentation. const EOF: Poll> = Poll::Ready(Ok(&[])); @@ -151,3 +151,60 @@ pub async fn serve_websocket( .await?; Ok(()) } + +#[cfg(test)] +mod tests { + use std::pin::pin; + + use futures::{SinkExt, StreamExt}; + use hyper_tungstenite::{ + tungstenite::{protocol::Role, Message}, + WebSocketStream, + }; + use tokio::{ + io::{duplex, AsyncReadExt, AsyncWriteExt}, + task::JoinSet, + }; + + use super::WebSocketRw; + + #[tokio::test] + async fn websocket_stream_wrapper_happy_path() { + let (stream1, stream2) = duplex(1024); + + let mut js = JoinSet::new(); + + js.spawn(async move { + let mut client = WebSocketStream::from_raw_socket(stream1, Role::Client, None).await; + + client + .send(Message::Binary(b"hello world".to_vec())) + .await + .unwrap(); + + let message = client.next().await.unwrap().unwrap(); + assert_eq!(message, Message::Binary(b"websockets are cool".to_vec())); + + client.close(None).await.unwrap(); + }); + + js.spawn(async move { + let mut rw = pin!(WebSocketRw::new( + WebSocketStream::from_raw_socket(stream2, Role::Server, None).await + )); + + let mut buf = vec![0; 1024]; + let n = rw.read(&mut buf).await.unwrap(); + assert_eq!(&buf[..n], b"hello world"); + + rw.write_all(b"websockets are cool").await.unwrap(); + rw.flush().await.unwrap(); + + let n = rw.read_to_end(&mut buf).await.unwrap(); + assert_eq!(n, 0); + }); + + js.join_next().await.unwrap().unwrap(); + js.join_next().await.unwrap().unwrap(); + } +} diff --git a/s3_scrubber/Cargo.toml b/s3_scrubber/Cargo.toml index e26f2c6d6b..fdae378d55 100644 --- a/s3_scrubber/Cargo.toml +++ b/s3_scrubber/Cargo.toml @@ -31,6 +31,7 @@ reqwest = { workspace = true, default-features = false, features = ["rustls-tls" aws-config = { workspace = true, default-features = false, features = ["rustls", "sso"] } pageserver = { path = "../pageserver" } +pageserver_api = { path = "../libs/pageserver_api" } remote_storage = { path = "../libs/remote_storage" } tracing.workspace = true diff --git a/s3_scrubber/src/checks.rs b/s3_scrubber/src/checks.rs index a15a908212..2acbb2352b 100644 --- a/s3_scrubber/src/checks.rs +++ b/s3_scrubber/src/checks.rs @@ -7,13 +7,12 @@ use utils::generation::Generation; use crate::cloud_admin_api::BranchData; use crate::metadata_stream::stream_listing; -use crate::{download_object_with_retries, RootTarget}; +use crate::{download_object_with_retries, RootTarget, TenantShardTimelineId}; use futures_util::{pin_mut, StreamExt}; use pageserver::tenant::remote_timeline_client::parse_remote_index_path; use pageserver::tenant::storage_layer::LayerFileName; use pageserver::tenant::IndexPart; use remote_storage::RemotePath; -use utils::id::TenantTimelineId; pub(crate) struct TimelineAnalysis { /// Anomalies detected @@ -39,8 +38,8 @@ impl TimelineAnalysis { } } -pub(crate) async fn branch_cleanup_and_check_errors( - id: &TenantTimelineId, +pub(crate) fn branch_cleanup_and_check_errors( + id: &TenantShardTimelineId, s3_root: &RootTarget, s3_active_branch: Option<&BranchData>, console_branch: Option, @@ -238,7 +237,7 @@ fn parse_layer_object_name(name: &str) -> Result<(LayerFileName, Generation), St pub(crate) async fn list_timeline_blobs( s3_client: &Client, - id: TenantTimelineId, + id: TenantShardTimelineId, s3_root: &RootTarget, ) -> anyhow::Result { let mut s3_layers = HashSet::new(); diff --git a/s3_scrubber/src/garbage.rs b/s3_scrubber/src/garbage.rs index f27e1d7f65..7192afb91b 100644 --- a/s3_scrubber/src/garbage.rs +++ b/s3_scrubber/src/garbage.rs @@ -10,15 +10,16 @@ use aws_sdk_s3::{ Client, }; use futures_util::{pin_mut, TryStreamExt}; +use pageserver_api::shard::TenantShardId; use serde::{Deserialize, Serialize}; use tokio_stream::StreamExt; -use utils::id::{TenantId, TenantTimelineId}; +use utils::id::TenantId; use crate::{ cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData}, init_remote, metadata_stream::{stream_listing, stream_tenant_timelines, stream_tenants}, - BucketConfig, ConsoleConfig, NodeKind, RootTarget, TraversingDepth, + BucketConfig, ConsoleConfig, NodeKind, RootTarget, TenantShardTimelineId, TraversingDepth, }; #[derive(Serialize, Deserialize, Debug)] @@ -29,8 +30,8 @@ enum GarbageReason { #[derive(Serialize, Deserialize, Debug)] enum GarbageEntity { - Tenant(TenantId), - Timeline(TenantTimelineId), + Tenant(TenantShardId), + Timeline(TenantShardTimelineId), } #[derive(Serialize, Deserialize, Debug)] @@ -142,6 +143,9 @@ async fn find_garbage_inner( console_projects.len() ); + // TODO(sharding): batch calls into Console so that we only call once for each TenantId, + // rather than checking the same TenantId for multiple TenantShardId + // Enumerate Tenants in S3, and check if each one exists in Console tracing::info!("Finding all tenants in bucket {}...", bucket_config.bucket); let tenants = stream_tenants(&s3_client, &target); @@ -149,10 +153,10 @@ async fn find_garbage_inner( let api_client = cloud_admin_api_client.clone(); let console_projects = &console_projects; async move { - match console_projects.get(&t) { + match console_projects.get(&t.tenant_id) { Some(project_data) => Ok((t, Some(project_data.clone()))), None => api_client - .find_tenant_project(t) + .find_tenant_project(t.tenant_id) .await .map_err(|e| anyhow::anyhow!(e)) .map(|r| (t, r)), @@ -166,21 +170,21 @@ async fn find_garbage_inner( // checks if they are enabled by the `depth` parameter. pin_mut!(tenants_checked); let mut garbage = GarbageList::new(node_kind, bucket_config); - let mut active_tenants: Vec = vec![]; + let mut active_tenants: Vec = vec![]; let mut counter = 0; while let Some(result) = tenants_checked.next().await { - let (tenant_id, console_result) = result?; + let (tenant_shard_id, console_result) = result?; // Paranoia check if let Some(project) = &console_result { - assert!(project.tenant == tenant_id); + assert!(project.tenant == tenant_shard_id.tenant_id); } - if garbage.maybe_append(GarbageEntity::Tenant(tenant_id), console_result) { - tracing::debug!("Tenant {tenant_id} is garbage"); + if garbage.maybe_append(GarbageEntity::Tenant(tenant_shard_id), console_result) { + tracing::debug!("Tenant {tenant_shard_id} is garbage"); } else { - tracing::debug!("Tenant {tenant_id} is active"); - active_tenants.push(tenant_id); + tracing::debug!("Tenant {tenant_shard_id} is active"); + active_tenants.push(tenant_shard_id); } counter += 1; @@ -266,13 +270,13 @@ impl std::fmt::Display for PurgeMode { pub async fn get_tenant_objects( s3_client: &Arc, target: RootTarget, - tenant_id: TenantId, + tenant_shard_id: TenantShardId, ) -> anyhow::Result> { - tracing::debug!("Listing objects in tenant {tenant_id}"); + tracing::debug!("Listing objects in tenant {tenant_shard_id}"); // TODO: apply extra validation based on object modification time. Don't purge // tenants where any timeline's index_part.json has been touched recently. - let mut tenant_root = target.tenant_root(&tenant_id); + let mut tenant_root = target.tenant_root(&tenant_shard_id); // Remove delimiter, so that object listing lists all keys in the prefix and not just // common prefixes. @@ -285,7 +289,7 @@ pub async fn get_tenant_objects( pub async fn get_timeline_objects( s3_client: &Arc, target: RootTarget, - ttid: TenantTimelineId, + ttid: TenantShardTimelineId, ) -> anyhow::Result> { tracing::debug!("Listing objects in timeline {ttid}"); let mut timeline_root = target.timeline_root(&ttid); diff --git a/s3_scrubber/src/lib.rs b/s3_scrubber/src/lib.rs index 6607db21e6..d2338c21e5 100644 --- a/s3_scrubber/src/lib.rs +++ b/s3_scrubber/src/lib.rs @@ -22,6 +22,7 @@ use aws_sdk_s3::{Client, Config}; use clap::ValueEnum; use pageserver::tenant::TENANTS_SEGMENT_NAME; +use pageserver_api::shard::TenantShardId; use reqwest::Url; use serde::{Deserialize, Serialize}; use std::io::IsTerminal; @@ -29,7 +30,7 @@ use tokio::io::AsyncReadExt; use tracing::error; use tracing_appender::non_blocking::WorkerGuard; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; -use utils::id::{TenantId, TenantTimelineId}; +use utils::id::TimelineId; const MAX_RETRIES: usize = 20; const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN"; @@ -44,6 +45,35 @@ pub struct S3Target { pub delimiter: String, } +/// Convenience for referring to timelines within a particular shard: more ergonomic +/// than using a 2-tuple. +/// +/// This is the shard-aware equivalent of TenantTimelineId. It's defined here rather +/// than somewhere more broadly exposed, because this kind of thing is rarely needed +/// in the pageserver, as all timeline objects existing in the scope of a particular +/// tenant: the scrubber is different in that it handles collections of data referring to many +/// TenantShardTimelineIds in on place. +#[derive(Serialize, Deserialize, Debug, Clone, Copy, Hash, PartialEq, Eq)] +pub struct TenantShardTimelineId { + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, +} + +impl TenantShardTimelineId { + fn new(tenant_shard_id: TenantShardId, timeline_id: TimelineId) -> Self { + Self { + tenant_shard_id, + timeline_id, + } + } +} + +impl Display for TenantShardTimelineId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}/{}", self.tenant_shard_id, self.timeline_id) + } +} + #[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq, Eq)] pub enum TraversingDepth { Tenant, @@ -110,19 +140,19 @@ impl RootTarget { } } - pub fn tenant_root(&self, tenant_id: &TenantId) -> S3Target { + pub fn tenant_root(&self, tenant_id: &TenantShardId) -> S3Target { self.tenants_root().with_sub_segment(&tenant_id.to_string()) } - pub fn timelines_root(&self, tenant_id: &TenantId) -> S3Target { + pub fn timelines_root(&self, tenant_id: &TenantShardId) -> S3Target { match self { Self::Pageserver(_) => self.tenant_root(tenant_id).with_sub_segment("timelines"), Self::Safekeeper(_) => self.tenant_root(tenant_id), } } - pub fn timeline_root(&self, id: &TenantTimelineId) -> S3Target { - self.timelines_root(&id.tenant_id) + pub fn timeline_root(&self, id: &TenantShardTimelineId) -> S3Target { + self.timelines_root(&id.tenant_shard_id) .with_sub_segment(&id.timeline_id.to_string()) } diff --git a/s3_scrubber/src/metadata_stream.rs b/s3_scrubber/src/metadata_stream.rs index 4cfa77cfc1..073f37f319 100644 --- a/s3_scrubber/src/metadata_stream.rs +++ b/s3_scrubber/src/metadata_stream.rs @@ -3,14 +3,15 @@ use async_stream::{stream, try_stream}; use aws_sdk_s3::{types::ObjectIdentifier, Client}; use tokio_stream::Stream; -use crate::{list_objects_with_retries, RootTarget, S3Target, TenantId}; -use utils::id::{TenantTimelineId, TimelineId}; +use crate::{list_objects_with_retries, RootTarget, S3Target, TenantShardTimelineId}; +use pageserver_api::shard::TenantShardId; +use utils::id::TimelineId; /// Given an S3 bucket, output a stream of TenantIds discovered via ListObjectsv2 pub fn stream_tenants<'a>( s3_client: &'a Client, target: &'a RootTarget, -) -> impl Stream> + 'a { +) -> impl Stream> + 'a { try_stream! { let mut continuation_token = None; let tenants_target = target.tenants_root(); @@ -44,14 +45,14 @@ pub fn stream_tenants<'a>( } } -/// Given a TenantId, output a stream of the timelines within that tenant, discovered +/// Given a TenantShardId, output a stream of the timelines within that tenant, discovered /// using ListObjectsv2. The listing is done before the stream is built, so that this /// function can be used to generate concurrency on a stream using buffer_unordered. pub async fn stream_tenant_timelines<'a>( s3_client: &'a Client, target: &'a RootTarget, - tenant: TenantId, -) -> anyhow::Result> + 'a> { + tenant: TenantShardId, +) -> anyhow::Result> + 'a> { let mut timeline_ids: Vec> = Vec::new(); let mut continuation_token = None; let timelines_target = target.timelines_root(&tenant); @@ -98,7 +99,7 @@ pub async fn stream_tenant_timelines<'a>( Ok(stream! { for i in timeline_ids { let id = i?; - yield Ok(TenantTimelineId::new(tenant, id)); + yield Ok(TenantShardTimelineId::new(tenant, id)); } }) } diff --git a/s3_scrubber/src/scan_metadata.rs b/s3_scrubber/src/scan_metadata.rs index 228f8d6763..91347ca21b 100644 --- a/s3_scrubber/src/scan_metadata.rs +++ b/s3_scrubber/src/scan_metadata.rs @@ -5,20 +5,19 @@ use crate::checks::{ TimelineAnalysis, }; use crate::metadata_stream::{stream_tenant_timelines, stream_tenants}; -use crate::{init_remote, BucketConfig, NodeKind, RootTarget}; +use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId}; use aws_sdk_s3::Client; use futures_util::{pin_mut, StreamExt, TryStreamExt}; use histogram::Histogram; use pageserver::tenant::IndexPart; use serde::Serialize; -use utils::id::TenantTimelineId; #[derive(Serialize)] pub struct MetadataSummary { count: usize, - with_errors: HashSet, - with_warnings: HashSet, - with_garbage: HashSet, + with_errors: HashSet, + with_warnings: HashSet, + with_garbage: HashSet, indices_by_version: HashMap, layer_count: MinMaxHisto, @@ -132,7 +131,7 @@ impl MetadataSummary { } } - fn update_analysis(&mut self, id: &TenantTimelineId, analysis: &TimelineAnalysis) { + fn update_analysis(&mut self, id: &TenantShardTimelineId, analysis: &TimelineAnalysis) { if !analysis.errors.is_empty() { self.with_errors.insert(*id); } @@ -199,8 +198,8 @@ pub async fn scan_metadata(bucket_config: BucketConfig) -> anyhow::Result anyhow::Result<(TenantTimelineId, S3TimelineBlobData)> { + ttid: TenantShardTimelineId, + ) -> anyhow::Result<(TenantShardTimelineId, S3TimelineBlobData)> { let data = list_timeline_blobs(s3_client, ttid, target).await?; Ok((ttid, data)) } @@ -213,8 +212,7 @@ pub async fn scan_metadata(bucket_config: BucketConfig) -> anyhow::Result Generator[NegativeTests, N TenantId(t["id"]) for t in ps_http.tenant_list() ], "tenant should not be attached after negative test" - env.pageserver.allowed_errors.append(".*Error processing HTTP request: Bad request") + env.pageserver.allowed_errors.extend( + [ + # This fixture detaches the tenant, and tests using it will tend to re-attach it + # shortly after. There may be un-processed deletion_queue validations from the + # initial attachment + ".*Dropped remote consistent LSN updates.*", + # This fixture is for tests that will intentionally generate 400 responses + ".*Error processing HTTP request: Bad request", + ] + ) def log_contains_bad_request(): env.pageserver.log_contains(".*Error processing HTTP request: Bad request") diff --git a/test_runner/regress/test_broken_timeline.py b/test_runner/regress/test_broken_timeline.py index 53eeb8bbe9..4da0ba7b20 100644 --- a/test_runner/regress/test_broken_timeline.py +++ b/test_runner/regress/test_broken_timeline.py @@ -20,7 +20,7 @@ def test_local_corruption(neon_env_builder: NeonEnvBuilder): env.pageserver.allowed_errors.extend( [ - ".*layer loading failed:.*", + ".*get_value_reconstruct_data for layer .*", ".*could not find data for key.*", ".*is not active. Current state: Broken.*", ".*will not become active. Current state: Broken.*", @@ -83,7 +83,7 @@ def test_local_corruption(neon_env_builder: NeonEnvBuilder): # (We don't check layer file contents on startup, when loading the timeline) # # This will change when we implement checksums for layers - with pytest.raises(Exception, match="layer loading failed:") as err: + with pytest.raises(Exception, match="get_value_reconstruct_data for layer ") as err: pg2.start() log.info( f"As expected, compute startup failed for timeline {tenant2}/{timeline2} with corrupt layers: {err}" diff --git a/test_runner/regress/test_timeline_size.py b/test_runner/regress/test_timeline_size.py index 24cbe34457..6e510b2eba 100644 --- a/test_runner/regress/test_timeline_size.py +++ b/test_runner/regress/test_timeline_size.py @@ -300,7 +300,8 @@ def test_timeline_initial_logical_size_calculation_cancellation( env = neon_env_builder.init_start() client = env.pageserver.http_client() - tenant_id, timeline_id = env.neon_cli.create_tenant() + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline # load in some data endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) @@ -732,3 +733,142 @@ def wait_for_timeline_size_init( raise Exception( f"timed out while waiting for current_logical_size of a timeline to reach its non-incremental value, details: {timeline_details}" ) + + +def test_ondemand_activation(neon_env_builder: NeonEnvBuilder): + """ + Tenants warmuping up opportunistically will wait for one another's logical size calculations to complete + before proceeding. However, they skip this if a client is actively trying to access them. + + This test is not purely about logical sizes, but logical size calculation is the phase that we + use as a proxy for "warming up" in this test: it happens within the semaphore guard used + to limit concurrent tenant warm-up. + """ + + # We will run with the limit set to 1, so that once we have one tenant stuck + # in a pausable failpoint, the rest are prevented from proceeding through warmup. + neon_env_builder.pageserver_config_override = "concurrent_tenant_warmup = '1'" + + env = neon_env_builder.init_start() + pageserver_http = env.pageserver.http_client() + + # Create some tenants + n_tenants = 10 + tenant_ids = {env.initial_tenant} + for _i in range(0, n_tenants - 1): + tenant_id = TenantId.generate() + env.pageserver.tenant_create(tenant_id) + + # Empty tenants are not subject to waiting for logical size calculations, because + # those hapen on timeline level + timeline_id = TimelineId.generate() + env.neon_cli.create_timeline( + new_branch_name="main", tenant_id=tenant_id, timeline_id=timeline_id + ) + + tenant_ids.add(tenant_id) + + # Restart pageserver with logical size calculations paused + env.pageserver.stop() + env.pageserver.start( + extra_env_vars={"FAILPOINTS": "timeline-calculate-logical-size-pause=pause"} + ) + + def get_tenant_states(): + states = {} + for tenant_id in tenant_ids: + tenant = pageserver_http.tenant_status(tenant_id=tenant_id) + states[tenant_id] = tenant["state"]["slug"] + log.info(f"Tenant states: {states}") + return states + + def at_least_one_active(): + assert "Active" in set(get_tenant_states().values()) + + # One tenant should activate, then get stuck in their logical size calculation + wait_until(10, 1, at_least_one_active) + + # Wait some walltime to gain confidence that other tenants really are stuck and not proceeding to activate + time.sleep(5) + + # We should see one tenant win the activation race, and enter logical size calculation. The rest + # will stay in Attaching state, waiting for the "warmup_limit" semaphore + expect_activated = 1 + states = get_tenant_states() + assert len([s for s in states.values() if s == "Active"]) == expect_activated + assert len([s for s in states.values() if s == "Attaching"]) == n_tenants - expect_activated + + assert ( + pageserver_http.get_metric_value("pageserver_tenant_startup_scheduled_total") == n_tenants + ) + + # This is zero, and subsequent checks are expect_activated - 1, because this counter does not + # count how may tenants are Active, it counts how many have finished warmup. The first tenant + # that reached Active is still stuck in its local size calculation, and has therefore not finished warmup. + assert pageserver_http.get_metric_value("pageserver_tenant_startup_complete_total") == 0 + + # If a client accesses one of the blocked tenants, it should skip waiting for warmup and + # go active as fast as it can. + stuck_tenant_id = list( + [(tid, s) for (tid, s) in get_tenant_states().items() if s == "Attaching"] + )[0][0] + + endpoint = env.endpoints.create_start(branch_name="main", tenant_id=stuck_tenant_id) + endpoint.safe_psql_many( + [ + "CREATE TABLE foo (x INTEGER)", + "INSERT INTO foo SELECT g FROM generate_series(1, 10) g", + ] + ) + endpoint.stop() + + # That one that we successfully accessed is now Active + expect_activated += 1 + assert pageserver_http.tenant_status(tenant_id=stuck_tenant_id)["state"]["slug"] == "Active" + assert ( + pageserver_http.get_metric_value("pageserver_tenant_startup_complete_total") + == expect_activated - 1 + ) + + # The ones we didn't touch are still in Attaching + assert ( + len([s for s in get_tenant_states().values() if s == "Attaching"]) + == n_tenants - expect_activated + ) + + # Timeline creation operations also wake up Attaching tenants + stuck_tenant_id = list( + [(tid, s) for (tid, s) in get_tenant_states().items() if s == "Attaching"] + )[0][0] + pageserver_http.timeline_create(env.pg_version, stuck_tenant_id, TimelineId.generate()) + expect_activated += 1 + assert pageserver_http.tenant_status(tenant_id=stuck_tenant_id)["state"]["slug"] == "Active" + assert ( + len([s for s in get_tenant_states().values() if s == "Attaching"]) + == n_tenants - expect_activated + ) + + assert ( + pageserver_http.get_metric_value("pageserver_tenant_startup_complete_total") + == expect_activated - 1 + ) + + # When we unblock logical size calculation, all tenants should proceed to active state via + # the warmup route. + pageserver_http.configure_failpoints(("timeline-calculate-logical-size-pause", "off")) + + def all_active(): + assert all(s == "Active" for s in get_tenant_states().values()) + + wait_until(10, 1, all_active) + + # Final control check: restarting with no failpoints at all results in all tenants coming active + # without being prompted by client I/O + env.pageserver.stop() + env.pageserver.start() + wait_until(10, 1, all_active) + + assert ( + pageserver_http.get_metric_value("pageserver_tenant_startup_scheduled_total") == n_tenants + ) + assert pageserver_http.get_metric_value("pageserver_tenant_startup_complete_total") == n_tenants