diff --git a/Cargo.lock b/Cargo.lock index fe3be55702..a647568f28 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3104,6 +3104,7 @@ dependencies = [ "humantime-serde", "hyper", "itertools", + "md5", "metrics", "nix 0.26.2", "num-traits", diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index b24f5f399a..f639c6cdc6 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -833,7 +833,7 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re } else { None }; - endpoint.reconfigure(pageserver_id)?; + endpoint.reconfigure(pageserver_id).await?; } "stop" => { let endpoint_id = sub_args diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 3960de3beb..071f22dc2b 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -659,7 +659,7 @@ impl Endpoint { } } - pub fn reconfigure(&self, pageserver_id: Option) -> Result<()> { + pub async fn reconfigure(&self, pageserver_id: Option) -> Result<()> { let mut spec: ComputeSpec = { let spec_path = self.endpoint_path().join("spec.json"); let file = std::fs::File::open(spec_path)?; @@ -688,7 +688,7 @@ impl Endpoint { spec.pageserver_connstring = Some(format!("postgresql://no_user@{host}:{port}")); } - let client = reqwest::blocking::Client::new(); + let client = reqwest::Client::new(); let response = client .post(format!( "http://{}:{}/configure", @@ -699,14 +699,15 @@ impl Endpoint { "{{\"spec\":{}}}", serde_json::to_string_pretty(&spec)? )) - .send()?; + .send() + .await?; let status = response.status(); if !(status.is_client_error() || status.is_server_error()) { Ok(()) } else { let url = response.url().to_owned(); - let msg = match response.text() { + let msg = match response.text().await { Ok(err_body) => format!("Error: {}", err_body), Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url), }; diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 44ca8da233..f230973cd0 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -368,6 +368,7 @@ impl PageServerNode { .map(|x| x.parse::()) .transpose() .context("Failed to parse 'gc_feedback' as bool")?, + heatmap_period: settings.remove("heatmap_period").map(|x| x.to_string()), }; let request = models::TenantCreateRequest { @@ -452,6 +453,7 @@ impl PageServerNode { .map(|x| x.parse::()) .transpose() .context("Failed to parse 'gc_feedback' as bool")?, + heatmap_period: settings.remove("heatmap_period").map(|x| x.to_string()), } }; diff --git a/control_plane/src/tenant_migration.rs b/control_plane/src/tenant_migration.rs index 89ddcc19e5..79df108896 100644 --- a/control_plane/src/tenant_migration.rs +++ b/control_plane/src/tenant_migration.rs @@ -155,7 +155,7 @@ pub async fn migrate_tenant( "🔁 Reconfiguring endpoint {} to use pageserver {}", endpoint_name, dest_ps.conf.id ); - endpoint.reconfigure(Some(dest_ps.conf.id))?; + endpoint.reconfigure(Some(dest_ps.conf.id)).await?; } } diff --git a/libs/metrics/src/lib.rs b/libs/metrics/src/lib.rs index ed375a152f..d09ba11344 100644 --- a/libs/metrics/src/lib.rs +++ b/libs/metrics/src/lib.rs @@ -3,8 +3,11 @@ //! Otherwise, we might not see all metrics registered via //! a default registry. #![deny(clippy::undocumented_unsafe_blocks)] + use once_cell::sync::Lazy; -use prometheus::core::{AtomicU64, Collector, GenericGauge, GenericGaugeVec}; +use prometheus::core::{ + Atomic, AtomicU64, Collector, GenericCounter, GenericCounterVec, GenericGauge, GenericGaugeVec, +}; pub use prometheus::opts; pub use prometheus::register; pub use prometheus::Error; @@ -132,3 +135,137 @@ fn get_rusage_stats() -> libc::rusage { rusage.assume_init() } } + +/// Create an [`IntCounterPairVec`] and registers to default registry. +#[macro_export(local_inner_macros)] +macro_rules! register_int_counter_pair_vec { + ($NAME1:expr, $HELP1:expr, $NAME2:expr, $HELP2:expr, $LABELS_NAMES:expr $(,)?) => {{ + match ( + $crate::register_int_counter_vec!($NAME1, $HELP1, $LABELS_NAMES), + $crate::register_int_counter_vec!($NAME2, $HELP2, $LABELS_NAMES), + ) { + (Ok(inc), Ok(dec)) => Ok($crate::IntCounterPairVec::new(inc, dec)), + (Err(e), _) | (_, Err(e)) => Err(e), + } + }}; +} +/// Create an [`IntCounterPair`] and registers to default registry. +#[macro_export(local_inner_macros)] +macro_rules! register_int_counter_pair { + ($NAME1:expr, $HELP1:expr, $NAME2:expr, $HELP2:expr $(,)?) => {{ + match ( + $crate::register_int_counter!($NAME1, $HELP1), + $crate::register_int_counter!($NAME2, $HELP2), + ) { + (Ok(inc), Ok(dec)) => Ok($crate::IntCounterPair::new(inc, dec)), + (Err(e), _) | (_, Err(e)) => Err(e), + } + }}; +} + +/// A Pair of [`GenericCounterVec`]s. Like an [`GenericGaugeVec`] but will always observe changes +pub struct GenericCounterPairVec { + inc: GenericCounterVec

, + dec: GenericCounterVec

, +} + +/// A Pair of [`GenericCounter`]s. Like an [`GenericGauge`] but will always observe changes +pub struct GenericCounterPair { + inc: GenericCounter

, + dec: GenericCounter

, +} + +impl GenericCounterPairVec

{ + pub fn new(inc: GenericCounterVec

, dec: GenericCounterVec

) -> Self { + Self { inc, dec } + } + + /// `get_metric_with_label_values` returns the [`GenericCounterPair

`] for the given slice + /// of label values (same order as the VariableLabels in Desc). If that combination of + /// label values is accessed for the first time, a new [`GenericCounterPair

`] is created. + /// + /// An error is returned if the number of label values is not the same as the + /// number of VariableLabels in Desc. + pub fn get_metric_with_label_values(&self, vals: &[&str]) -> Result> { + Ok(GenericCounterPair { + inc: self.inc.get_metric_with_label_values(vals)?, + dec: self.dec.get_metric_with_label_values(vals)?, + }) + } + + /// `with_label_values` works as `get_metric_with_label_values`, but panics if an error + /// occurs. + pub fn with_label_values(&self, vals: &[&str]) -> GenericCounterPair

{ + self.get_metric_with_label_values(vals).unwrap() + } +} + +impl GenericCounterPair

{ + pub fn new(inc: GenericCounter

, dec: GenericCounter

) -> Self { + Self { inc, dec } + } + + /// Increment the gauge by 1, returning a guard that decrements by 1 on drop. + pub fn guard(&self) -> GenericCounterPairGuard

{ + self.inc.inc(); + GenericCounterPairGuard(self.dec.clone()) + } + + /// Increment the gauge by n, returning a guard that decrements by n on drop. + pub fn guard_by(&self, n: P::T) -> GenericCounterPairGuardBy

{ + self.inc.inc_by(n); + GenericCounterPairGuardBy(self.dec.clone(), n) + } + + /// Increase the gauge by 1. + #[inline] + pub fn inc(&self) { + self.inc.inc(); + } + + /// Decrease the gauge by 1. + #[inline] + pub fn dec(&self) { + self.dec.inc(); + } + + /// Add the given value to the gauge. (The value can be + /// negative, resulting in a decrement of the gauge.) + #[inline] + pub fn inc_by(&self, v: P::T) { + self.inc.inc_by(v); + } + + /// Subtract the given value from the gauge. (The value can be + /// negative, resulting in an increment of the gauge.) + #[inline] + pub fn dec_by(&self, v: P::T) { + self.dec.inc_by(v); + } +} + +/// Guard returned by [`GenericCounterPair::guard`] +pub struct GenericCounterPairGuard(GenericCounter

); + +impl Drop for GenericCounterPairGuard

{ + fn drop(&mut self) { + self.0.inc(); + } +} +/// Guard returned by [`GenericCounterPair::guard_by`] +pub struct GenericCounterPairGuardBy(GenericCounter

, P::T); + +impl Drop for GenericCounterPairGuardBy

{ + fn drop(&mut self) { + self.0.inc_by(self.1); + } +} + +/// A Pair of [`IntCounterVec`]s. Like an [`IntGaugeVec`] but will always observe changes +pub type IntCounterPairVec = GenericCounterPairVec; + +/// A Pair of [`IntCounter`]s. Like an [`IntGauge`] but will always observe changes +pub type IntCounterPair = GenericCounterPair; + +/// A guard for [`IntCounterPair`] that will decrement the gauge on drop +pub type IntCounterPairGuard = GenericCounterPairGuard; diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index a3029e67a5..fbc7d73235 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -237,6 +237,7 @@ pub struct TenantConfig { pub min_resident_size_override: Option, pub evictions_low_residence_duration_metric_threshold: Option, pub gc_feedback: Option, + pub heatmap_period: Option, } /// A flattened analog of a `pagesever::tenant::LocationMode`, which diff --git a/libs/utils/src/logging.rs b/libs/utils/src/logging.rs index 2f09c2f3ea..f7b73dc984 100644 --- a/libs/utils/src/logging.rs +++ b/libs/utils/src/logging.rs @@ -1,6 +1,7 @@ use std::str::FromStr; use anyhow::Context; +use metrics::{IntCounter, IntCounterVec}; use once_cell::sync::Lazy; use strum_macros::{EnumString, EnumVariantNames}; @@ -24,16 +25,48 @@ impl LogFormat { } } -static TRACING_EVENT_COUNT: Lazy = Lazy::new(|| { - metrics::register_int_counter_vec!( +struct TracingEventCountMetric { + error: IntCounter, + warn: IntCounter, + info: IntCounter, + debug: IntCounter, + trace: IntCounter, +} + +static TRACING_EVENT_COUNT_METRIC: Lazy = Lazy::new(|| { + let vec = metrics::register_int_counter_vec!( "libmetrics_tracing_event_count", "Number of tracing events, by level", &["level"] ) - .expect("failed to define metric") + .expect("failed to define metric"); + TracingEventCountMetric::new(vec) }); -struct TracingEventCountLayer(&'static metrics::IntCounterVec); +impl TracingEventCountMetric { + fn new(vec: IntCounterVec) -> Self { + Self { + error: vec.with_label_values(&["error"]), + warn: vec.with_label_values(&["warn"]), + info: vec.with_label_values(&["info"]), + debug: vec.with_label_values(&["debug"]), + trace: vec.with_label_values(&["trace"]), + } + } + + fn inc_for_level(&self, level: tracing::Level) { + let counter = match level { + tracing::Level::ERROR => &self.error, + tracing::Level::WARN => &self.warn, + tracing::Level::INFO => &self.info, + tracing::Level::DEBUG => &self.debug, + tracing::Level::TRACE => &self.trace, + }; + counter.inc(); + } +} + +struct TracingEventCountLayer(&'static TracingEventCountMetric); impl tracing_subscriber::layer::Layer for TracingEventCountLayer where @@ -44,15 +77,7 @@ where event: &tracing::Event<'_>, _ctx: tracing_subscriber::layer::Context<'_, S>, ) { - let level = event.metadata().level(); - let level = match *level { - tracing::Level::ERROR => "error", - tracing::Level::WARN => "warn", - tracing::Level::INFO => "info", - tracing::Level::DEBUG => "debug", - tracing::Level::TRACE => "trace", - }; - self.0.with_label_values(&[level]).inc(); + self.0.inc_for_level(*event.metadata().level()); } } @@ -106,7 +131,9 @@ pub fn init( }; log_layer.with_filter(rust_log_env_filter()) }); - let r = r.with(TracingEventCountLayer(&TRACING_EVENT_COUNT).with_filter(rust_log_env_filter())); + let r = r.with( + TracingEventCountLayer(&TRACING_EVENT_COUNT_METRIC).with_filter(rust_log_env_filter()), + ); match tracing_error_layer_enablement { TracingErrorLayerEnablement::EnableWithRustLogFilter => r .with(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter())) @@ -257,14 +284,14 @@ impl std::fmt::Debug for SecretString { mod tests { use metrics::{core::Opts, IntCounterVec}; - use super::TracingEventCountLayer; + use crate::logging::{TracingEventCountLayer, TracingEventCountMetric}; #[test] fn tracing_event_count_metric() { let counter_vec = IntCounterVec::new(Opts::new("testmetric", "testhelp"), &["level"]).unwrap(); - let counter_vec = Box::leak(Box::new(counter_vec)); // make it 'static - let layer = TracingEventCountLayer(counter_vec); + let metric = Box::leak(Box::new(TracingEventCountMetric::new(counter_vec.clone()))); + let layer = TracingEventCountLayer(metric); use tracing_subscriber::prelude::*; tracing::subscriber::with_default(tracing_subscriber::registry().with(layer), || { diff --git a/libs/walproposer/src/walproposer.rs b/libs/walproposer/src/walproposer.rs index 0661d3a969..f5723018d7 100644 --- a/libs/walproposer/src/walproposer.rs +++ b/libs/walproposer/src/walproposer.rs @@ -436,9 +436,9 @@ mod tests { event_mask: 0, }), expected_messages: vec![ - // Greeting(ProposerGreeting { protocol_version: 2, pg_version: 160000, proposer_id: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], system_id: 0, timeline_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tenant_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tli: 1, wal_seg_size: 16777216 }) + // Greeting(ProposerGreeting { protocol_version: 2, pg_version: 160001, proposer_id: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], system_id: 0, timeline_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tenant_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tli: 1, wal_seg_size: 16777216 }) vec![ - 103, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 113, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 103, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 1, 113, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 158, 76, 143, 54, 6, 60, 108, 110, 147, 188, 32, 214, 90, 130, 15, 61, 158, 76, 143, 54, 6, 60, 108, 110, 147, 188, 32, 214, 90, 130, 15, 61, 1, 0, 0, 0, 0, 0, 0, 1, @@ -478,7 +478,7 @@ mod tests { // walproposer will panic when it finishes sync_safekeepers std::panic::catch_unwind(|| wp.start()).unwrap_err(); // validate the resulting LSN - assert_eq!(receiver.recv()?, 1337); + assert_eq!(receiver.try_recv(), Ok(1337)); Ok(()) // drop() will free up resources here } diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 35c260740c..9e8172c6a1 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -36,6 +36,7 @@ humantime.workspace = true humantime-serde.workspace = true hyper.workspace = true itertools.workspace = true +md5.workspace = true nix.workspace = true # hack to get the number of worker threads tokio uses num_cpus = { version = "1.15" } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 7607119dda..f65c4f4580 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -14,7 +14,7 @@ use pageserver::control_plane_client::ControlPlaneClient; use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task}; use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING}; use pageserver::task_mgr::WALRECEIVER_RUNTIME; -use pageserver::tenant::TenantSharedResources; +use pageserver::tenant::{secondary, TenantSharedResources}; use remote_storage::GenericRemoteStorage; use tokio::time::Instant; use tracing::*; @@ -504,6 +504,17 @@ fn start_pageserver( } }); + let secondary_controller = if let Some(remote_storage) = &remote_storage { + secondary::spawn_tasks( + tenant_manager.clone(), + remote_storage.clone(), + background_jobs_barrier.clone(), + shutdown_pageserver.clone(), + ) + } else { + secondary::null_controller() + }; + // shared state between the disk-usage backed eviction background task and the http endpoint // that allows triggering disk-usage based eviction manually. note that the http endpoint // is still accessible even if background task is not configured as long as remote storage has @@ -533,6 +544,7 @@ fn start_pageserver( broker_client.clone(), disk_usage_eviction_state, deletion_queue.new_client(), + secondary_controller, ) .context("Failed to initialize router state")?, ); diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 13d1fc775b..cd99cda783 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -70,6 +70,8 @@ pub mod defaults { pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min"; pub const DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY: &str = "10s"; + pub const DEFAULT_HEATMAP_UPLOAD_CONCURRENCY: usize = 8; + /// /// Default built-in configuration file. /// @@ -117,6 +119,8 @@ pub mod defaults { #evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}' #gc_feedback = false +#heatmap_upload_concurrency = {DEFAULT_HEATMAP_UPLOAD_CONCURRENCY} + [remote_storage] "# @@ -215,6 +219,10 @@ pub struct PageServerConf { /// If true, pageserver will make best-effort to operate without a control plane: only /// for use in major incidents. pub control_plane_emergency_mode: bool, + + /// How many heatmap uploads may be done concurrency: lower values implicitly deprioritize + /// heatmap uploads vs. other remote storage operations. + pub heatmap_upload_concurrency: usize, } /// We do not want to store this in a PageServerConf because the latter may be logged @@ -293,6 +301,8 @@ struct PageServerConfigBuilder { control_plane_api: BuilderValue>, control_plane_api_token: BuilderValue>, control_plane_emergency_mode: BuilderValue, + + heatmap_upload_concurrency: BuilderValue, } impl Default for PageServerConfigBuilder { @@ -361,6 +371,8 @@ impl Default for PageServerConfigBuilder { control_plane_api: Set(None), control_plane_api_token: Set(None), control_plane_emergency_mode: Set(false), + + heatmap_upload_concurrency: Set(DEFAULT_HEATMAP_UPLOAD_CONCURRENCY), } } } @@ -501,6 +513,10 @@ impl PageServerConfigBuilder { self.control_plane_emergency_mode = BuilderValue::Set(enabled) } + pub fn heatmap_upload_concurrency(&mut self, value: usize) { + self.heatmap_upload_concurrency = BuilderValue::Set(value) + } + pub fn build(self) -> anyhow::Result { let concurrent_tenant_size_logical_size_queries = self .concurrent_tenant_size_logical_size_queries @@ -595,6 +611,10 @@ impl PageServerConfigBuilder { control_plane_emergency_mode: self .control_plane_emergency_mode .ok_or(anyhow!("missing control_plane_emergency_mode"))?, + + heatmap_upload_concurrency: self + .heatmap_upload_concurrency + .ok_or(anyhow!("missing heatmap_upload_concurrency"))?, }) } } @@ -828,7 +848,9 @@ impl PageServerConf { }, "control_plane_emergency_mode" => { builder.control_plane_emergency_mode(parse_toml_bool(key, item)?) - + }, + "heatmap_upload_concurrency" => { + builder.heatmap_upload_concurrency(parse_toml_u64(key, item)? as usize) }, _ => bail!("unrecognized pageserver option '{key}'"), } @@ -896,6 +918,7 @@ impl PageServerConf { control_plane_api: None, control_plane_api_token: None, control_plane_emergency_mode: false, + heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY, } } } @@ -1120,7 +1143,8 @@ background_task_maximum_delay = '334 s' )?, control_plane_api: None, control_plane_api_token: None, - control_plane_emergency_mode: false + control_plane_emergency_mode: false, + heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY }, "Correct defaults should be used when no config values are provided" ); @@ -1177,7 +1201,8 @@ background_task_maximum_delay = '334 s' background_task_maximum_delay: Duration::from_secs(334), control_plane_api: None, control_plane_api_token: None, - control_plane_emergency_mode: false + control_plane_emergency_mode: false, + heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY }, "Should be able to parse all basic config values correctly" ); diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index 8f2b88d191..bde2cedca7 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -3,7 +3,7 @@ use crate::context::{DownloadBehavior, RequestContext}; use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}; use crate::tenant::tasks::BackgroundLoopKind; -use crate::tenant::{mgr, LogicalSizeCalculationCause, PageReconstructError}; +use crate::tenant::{mgr, LogicalSizeCalculationCause, PageReconstructError, Tenant}; use camino::Utf8PathBuf; use consumption_metrics::EventType; use pageserver_api::models::TenantState; @@ -256,8 +256,6 @@ async fn calculate_synthetic_size_worker( info!("calculate_synthetic_size_worker stopped"); }; - let cause = LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize; - loop { let started_at = Instant::now(); @@ -280,29 +278,14 @@ async fn calculate_synthetic_size_worker( continue; } - if let Ok(tenant) = mgr::get_tenant(tenant_shard_id, true) { - // TODO should we use concurrent_background_tasks_rate_limit() here, like the other background tasks? - // We can put in some prioritization for consumption metrics. - // Same for the loop that fetches computed metrics. - // By using the same limiter, we centralize metrics collection for "start" and "finished" counters, - // which turns out is really handy to understand the system. - if let Err(e) = tenant.calculate_synthetic_size(cause, cancel, ctx).await { - // this error can be returned if timeline is shutting down, but it does not - // mean the synthetic size worker should terminate. we do not need any checks - // in this function because `mgr::get_tenant` will error out after shutdown has - // progressed to shutting down tenants. - let is_cancelled = matches!( - e.downcast_ref::(), - Some(PageReconstructError::Cancelled) - ); + let Ok(tenant) = mgr::get_tenant(tenant_shard_id, true) else { + continue; + }; - if !is_cancelled { - error!( - "failed to calculate synthetic size for tenant {tenant_shard_id}: {e:#}" - ); - } - } - } + // there is never any reason to exit calculate_synthetic_size_worker following any + // return value -- we don't need to care about shutdown because no tenant is found when + // pageserver is shut down. + calculate_and_log(&tenant, cancel, ctx).await; } crate::tenant::tasks::warn_when_period_overrun( @@ -321,3 +304,31 @@ async fn calculate_synthetic_size_worker( } } } + +async fn calculate_and_log(tenant: &Tenant, cancel: &CancellationToken, ctx: &RequestContext) { + const CAUSE: LogicalSizeCalculationCause = + LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize; + + // TODO should we use concurrent_background_tasks_rate_limit() here, like the other background tasks? + // We can put in some prioritization for consumption metrics. + // Same for the loop that fetches computed metrics. + // By using the same limiter, we centralize metrics collection for "start" and "finished" counters, + // which turns out is really handy to understand the system. + let Err(e) = tenant.calculate_synthetic_size(CAUSE, cancel, ctx).await else { + return; + }; + + // this error can be returned if timeline is shutting down, but it does not + // mean the synthetic size worker should terminate. we do not need any checks + // in this function because `mgr::get_tenant` will error out after shutdown has + // progressed to shutting down tenants. + let shutting_down = matches!( + e.downcast_ref::(), + Some(PageReconstructError::Cancelled | PageReconstructError::AncestorStopping(_)) + ); + + if !shutting_down { + let tenant_shard_id = tenant.tenant_shard_id(); + error!("failed to calculate synthetic size for tenant {tenant_shard_id}: {e:#}"); + } +} diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index f01cd1cf8c..76906cfaf7 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -42,7 +42,6 @@ // reading these fields. We use the Debug impl for semi-structured logging, though. use std::{ - collections::HashMap, sync::Arc, time::{Duration, SystemTime}, }; @@ -125,7 +124,7 @@ pub fn launch_disk_usage_global_eviction_task( async fn disk_usage_eviction_task( state: &State, task_config: &DiskUsageEvictionTaskConfig, - _storage: &GenericRemoteStorage, + storage: &GenericRemoteStorage, tenants_dir: &Utf8Path, cancel: CancellationToken, ) { @@ -149,8 +148,14 @@ async fn disk_usage_eviction_task( let start = Instant::now(); async { - let res = - disk_usage_eviction_task_iteration(state, task_config, tenants_dir, &cancel).await; + let res = disk_usage_eviction_task_iteration( + state, + task_config, + storage, + tenants_dir, + &cancel, + ) + .await; match res { Ok(()) => {} @@ -181,12 +186,13 @@ pub trait Usage: Clone + Copy + std::fmt::Debug { async fn disk_usage_eviction_task_iteration( state: &State, task_config: &DiskUsageEvictionTaskConfig, + storage: &GenericRemoteStorage, tenants_dir: &Utf8Path, cancel: &CancellationToken, ) -> anyhow::Result<()> { let usage_pre = filesystem_level_usage::get(tenants_dir, task_config) .context("get filesystem-level disk usage before evictions")?; - let res = disk_usage_eviction_task_iteration_impl(state, usage_pre, cancel).await; + let res = disk_usage_eviction_task_iteration_impl(state, storage, usage_pre, cancel).await; match res { Ok(outcome) => { debug!(?outcome, "disk_usage_eviction_iteration finished"); @@ -268,8 +274,9 @@ struct LayerCount { count: usize, } -pub async fn disk_usage_eviction_task_iteration_impl( +pub(crate) async fn disk_usage_eviction_task_iteration_impl( state: &State, + _storage: &GenericRemoteStorage, usage_pre: U, cancel: &CancellationToken, ) -> anyhow::Result> { @@ -321,16 +328,16 @@ pub async fn disk_usage_eviction_task_iteration_impl( // Walk through the list of candidates, until we have accumulated enough layers to get // us back under the pressure threshold. 'usage_planned' is updated so that it tracks // how much disk space would be used after evicting all the layers up to the current - // point in the list. The layers are collected in 'batched', grouped per timeline. + // point in the list. // // If we get far enough in the list that we start to evict layers that are below // the tenant's min-resident-size threshold, print a warning, and memorize the disk // usage at that point, in 'usage_planned_min_resident_size_respecting'. - let mut batched: HashMap<_, Vec<_>> = HashMap::new(); let mut warned = None; let mut usage_planned = usage_pre; - let mut max_batch_size = 0; - for (i, (partition, candidate)) in candidates.into_iter().enumerate() { + let mut evicted_amount = 0; + + for (i, (partition, candidate)) in candidates.iter().enumerate() { if !usage_planned.has_pressure() { debug!( no_candidates_evicted = i, @@ -339,25 +346,13 @@ pub async fn disk_usage_eviction_task_iteration_impl( break; } - if partition == MinResidentSizePartition::Below && warned.is_none() { + if partition == &MinResidentSizePartition::Below && warned.is_none() { warn!(?usage_pre, ?usage_planned, candidate_no=i, "tenant_min_resident_size-respecting LRU would not relieve pressure, evicting more following global LRU policy"); warned = Some(usage_planned); } usage_planned.add_available_bytes(candidate.layer.layer_desc().file_size); - - // FIXME: batching makes no sense anymore because of no layermap locking, should just spawn - // tasks to evict all seen layers until we have evicted enough - - let batch = batched.entry(TimelineKey(candidate.timeline)).or_default(); - - // semaphore will later be used to limit eviction concurrency, and we can express at - // most u32 number of permits. unlikely we would have u32::MAX layers to be evicted, - // but fail gracefully by not making batches larger. - if batch.len() < u32::MAX as usize { - batch.push(candidate.layer); - max_batch_size = max_batch_size.max(batch.len()); - } + evicted_amount += 1; } let usage_planned = match warned { @@ -372,100 +367,79 @@ pub async fn disk_usage_eviction_task_iteration_impl( }; debug!(?usage_planned, "usage planned"); - // phase2: evict victims batched by timeline + // phase2: evict layers let mut js = tokio::task::JoinSet::new(); + let limit = 1000; - // ratelimit to 1k files or any higher max batch size - let limit = Arc::new(tokio::sync::Semaphore::new(1000.max(max_batch_size))); + let mut evicted = candidates.into_iter().take(evicted_amount).fuse(); + let mut consumed_all = false; - for (timeline, batch) in batched { - let tenant_shard_id = timeline.tenant_shard_id; - let timeline_id = timeline.timeline_id; - let batch_size = - u32::try_from(batch.len()).expect("batch size limited to u32::MAX during partitioning"); + // After the evictions, `usage_assumed` is the post-eviction usage, + // according to internal accounting. + let mut usage_assumed = usage_pre; + let mut evictions_failed = LayerCount::default(); - // I dislike naming of `available_permits` but it means current total amount of permits - // because permits can be added - assert!(batch_size as usize <= limit.available_permits()); + let evict_layers = async move { + loop { + let next = if js.len() >= limit || consumed_all { + js.join_next().await + } else if !js.is_empty() { + // opportunistically consume ready result, one per each new evicted + futures::future::FutureExt::now_or_never(js.join_next()).and_then(|x| x) + } else { + None + }; - debug!(%timeline_id, "evicting batch for timeline"); - - let evict = { - let limit = limit.clone(); - let cancel = cancel.clone(); - async move { - let mut evicted_bytes = 0; - let mut evictions_failed = LayerCount::default(); - - let Ok(_permit) = limit.acquire_many_owned(batch_size).await else { - // semaphore closing means cancelled - return (evicted_bytes, evictions_failed); - }; - - let results = timeline.evict_layers(&batch).await; - - match results { - Ok(results) => { - assert_eq!(results.len(), batch.len()); - for (result, layer) in results.into_iter().zip(batch.iter()) { - let file_size = layer.layer_desc().file_size; - match result { - Some(Ok(())) => { - evicted_bytes += file_size; - } - Some(Err(EvictionError::NotFound | EvictionError::Downloaded)) => { - evictions_failed.file_sizes += file_size; - evictions_failed.count += 1; - } - None => { - assert!(cancel.is_cancelled()); - } - } - } + if let Some(next) = next { + match next { + Ok(Ok(file_size)) => { + usage_assumed.add_available_bytes(file_size); } - Err(e) => { - warn!("failed to evict batch: {:#}", e); + Ok(Err((file_size, EvictionError::NotFound | EvictionError::Downloaded))) => { + evictions_failed.file_sizes += file_size; + evictions_failed.count += 1; } + Err(je) if je.is_cancelled() => unreachable!("not used"), + Err(je) if je.is_panic() => { /* already logged */ } + Err(je) => tracing::error!("unknown JoinError: {je:?}"), } - (evicted_bytes, evictions_failed) } - } - .instrument(tracing::info_span!("evict_batch", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id, batch_size)); - js.spawn(evict); - - // spwaning multiple thousands of these is essentially blocking, so give already spawned a - // chance of making progress - tokio::task::yield_now().await; - } - - let join_all = async move { - // After the evictions, `usage_assumed` is the post-eviction usage, - // according to internal accounting. - let mut usage_assumed = usage_pre; - let mut evictions_failed = LayerCount::default(); - - while let Some(res) = js.join_next().await { - match res { - Ok((evicted_bytes, failed)) => { - usage_assumed.add_available_bytes(evicted_bytes); - evictions_failed.file_sizes += failed.file_sizes; - evictions_failed.count += failed.count; - } - Err(je) if je.is_cancelled() => unreachable!("not used"), - Err(je) if je.is_panic() => { /* already logged */ } - Err(je) => tracing::error!("unknown JoinError: {je:?}"), + if consumed_all && js.is_empty() { + break; } + + // calling again when consumed_all is fine as evicted is fused. + let Some((_partition, candidate)) = evicted.next() else { + consumed_all = true; + continue; + }; + + js.spawn(async move { + let rtc = candidate.timeline.remote_client.as_ref().expect( + "holding the witness, all timelines must have a remote timeline client", + ); + let file_size = candidate.layer.layer_desc().file_size; + candidate + .layer + .evict_and_wait(rtc) + .await + .map(|()| file_size) + .map_err(|e| (file_size, e)) + }); + + tokio::task::yield_now().await; } + (usage_assumed, evictions_failed) }; let (usage_assumed, evictions_failed) = tokio::select! { - tuple = join_all => { tuple }, + tuple = evict_layers => { tuple }, _ = cancel.cancelled() => { - // close the semaphore to stop any pending acquires - limit.close(); + // dropping joinset will abort all pending evict_and_waits and that is fine, our + // requests will still stand return Ok(IterationOutcome::Cancelled); } }; diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 9422ccb2fd..b79c5ada9a 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -992,8 +992,8 @@ paths: type: string post: description: | - Create a timeline. Returns new timeline id on success.\ - If no new timeline id is specified in parameters, it would be generated. It's an error to recreate the same timeline. + Create a timeline. Returns new timeline id on success. + Recreating the same timeline will succeed if the parameters match the existing timeline. If no pg_version is specified, assume DEFAULT_PG_VERSION hardcoded in the pageserver. requestBody: content: @@ -1405,6 +1405,8 @@ components: type: integer trace_read_requests: type: boolean + heatmap_period: + type: integer TenantConfigResponse: type: object properties: diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index d083868599..c63c691726 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -38,6 +38,7 @@ use crate::tenant::mgr::{ GetTenantError, SetNewTenantConfigError, TenantManager, TenantMapError, TenantMapInsertError, TenantSlotError, TenantSlotUpsertError, TenantStateError, }; +use crate::tenant::secondary::SecondaryController; use crate::tenant::size::ModelInputs; use crate::tenant::storage_layer::LayerAccessStatsReset; use crate::tenant::timeline::CompactFlags; @@ -75,9 +76,11 @@ pub struct State { broker_client: storage_broker::BrokerClientChannel, disk_usage_eviction_state: Arc, deletion_queue_client: DeletionQueueClient, + secondary_controller: SecondaryController, } impl State { + #[allow(clippy::too_many_arguments)] pub fn new( conf: &'static PageServerConf, tenant_manager: Arc, @@ -86,6 +89,7 @@ impl State { broker_client: storage_broker::BrokerClientChannel, disk_usage_eviction_state: Arc, deletion_queue_client: DeletionQueueClient, + secondary_controller: SecondaryController, ) -> anyhow::Result { let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml", "/metrics"] .iter() @@ -100,6 +104,7 @@ impl State { broker_client, disk_usage_eviction_state, deletion_queue_client, + secondary_controller, }) } @@ -136,11 +141,6 @@ impl From for ApiError { fn from(pre: PageReconstructError) -> ApiError { match pre { PageReconstructError::Other(pre) => ApiError::InternalServerError(pre), - PageReconstructError::NeedsDownload(_, _) => { - // This shouldn't happen, because we use a RequestContext that requests to - // download any missing layer files on-demand. - ApiError::InternalServerError(anyhow::anyhow!("need to download remote layer file")) - } PageReconstructError::Cancelled => { ApiError::InternalServerError(anyhow::anyhow!("request was cancelled")) } @@ -453,7 +453,7 @@ async fn timeline_create_handler( .map_err(ApiError::InternalServerError)?; json_response(StatusCode::CREATED, timeline_info) } - Err(tenant::CreateTimelineError::AlreadyExists) => { + Err(tenant::CreateTimelineError::Conflict | tenant::CreateTimelineError::AlreadyCreating) => { json_response(StatusCode::CONFLICT, ()) } Err(tenant::CreateTimelineError::AncestorLsn(err)) => { @@ -1546,7 +1546,7 @@ async fn always_panic_handler( async fn disk_usage_eviction_run( mut r: Request, - _cancel: CancellationToken, + cancel: CancellationToken, ) -> Result, ApiError> { check_permission(&r, None)?; @@ -1583,48 +1583,41 @@ async fn disk_usage_eviction_run( freed_bytes: 0, }; - let (tx, rx) = tokio::sync::oneshot::channel(); - let state = get_state(&r); - if state.remote_storage.as_ref().is_none() { + let Some(storage) = state.remote_storage.as_ref() else { return Err(ApiError::InternalServerError(anyhow::anyhow!( "remote storage not configured, cannot run eviction iteration" ))); - } + }; let state = state.disk_usage_eviction_state.clone(); - let cancel = CancellationToken::new(); - let child_cancel = cancel.clone(); - let _g = cancel.drop_guard(); + let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl( + &state, storage, usage, &cancel, + ) + .await; - crate::task_mgr::spawn( - crate::task_mgr::BACKGROUND_RUNTIME.handle(), - TaskKind::DiskUsageEviction, - None, - None, - "ondemand disk usage eviction", - false, - async move { - let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl( - &state, - usage, - &child_cancel, - ) - .await; + info!(?res, "disk_usage_eviction_task_iteration_impl finished"); - info!(?res, "disk_usage_eviction_task_iteration_impl finished"); + let res = res.map_err(ApiError::InternalServerError)?; - let _ = tx.send(res); - Ok(()) - } - .in_current_span(), - ); + json_response(StatusCode::OK, res) +} - let response = rx.await.unwrap().map_err(ApiError::InternalServerError)?; +async fn secondary_upload_handler( + request: Request, + _cancel: CancellationToken, +) -> Result, ApiError> { + let state = get_state(&request); + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + state + .secondary_controller + .upload_tenant(tenant_shard_id) + .await + .map_err(ApiError::InternalServerError)?; - json_response(StatusCode::OK, response) + json_response(StatusCode::OK, ()) } async fn handler_404(_: Request) -> Result, ApiError> { @@ -1886,6 +1879,9 @@ pub fn make_router( "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer/:layer_file_name", |r| api_handler(r, evict_timeline_layer_handler), ) + .post("/v1/tenant/:tenant_shard_id/heatmap_upload", |r| { + api_handler(r, secondary_upload_handler) + }) .put("/v1/disk_usage_eviction/run", |r| { api_handler(r, disk_usage_eviction_run) }) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 7cc0333ee5..ba6fd00bd1 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -2,9 +2,10 @@ use enum_map::EnumMap; use metrics::metric_vec_duration::DurationResultObserver; use metrics::{ register_counter_vec, register_gauge_vec, register_histogram, register_histogram_vec, - register_int_counter, register_int_counter_vec, register_int_gauge, register_int_gauge_vec, - register_uint_gauge, register_uint_gauge_vec, Counter, CounterVec, GaugeVec, Histogram, - HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec, + register_int_counter, register_int_counter_pair_vec, register_int_counter_vec, + register_int_gauge, register_int_gauge_vec, register_uint_gauge, register_uint_gauge_vec, + Counter, CounterVec, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterPairVec, + IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec, }; use once_cell::sync::Lazy; use pageserver_api::shard::TenantShardId; @@ -1270,6 +1271,28 @@ pub(crate) static WAL_INGEST: Lazy = Lazy::new(|| WalIngestMet ) .expect("failed to define a metric"), }); +pub(crate) struct SecondaryModeMetrics { + pub(crate) upload_heatmap: IntCounter, + pub(crate) upload_heatmap_errors: IntCounter, + pub(crate) upload_heatmap_duration: Histogram, +} +pub(crate) static SECONDARY_MODE: Lazy = Lazy::new(|| SecondaryModeMetrics { + upload_heatmap: register_int_counter!( + "pageserver_secondary_upload_heatmap", + "Number of heatmaps written to remote storage by attached tenants" + ) + .expect("failed to define a metric"), + upload_heatmap_errors: register_int_counter!( + "pageserver_secondary_upload_heatmap_errors", + "Failures writing heatmap to remote storage" + ) + .expect("failed to define a metric"), + upload_heatmap_duration: register_histogram!( + "pageserver_secondary_upload_heatmap_duration", + "Time to build and upload a heatmap, including any waiting inside the S3 client" + ) + .expect("failed to define a metric"), +}); #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum RemoteOpKind { @@ -1321,25 +1344,16 @@ pub(crate) static TENANT_TASK_EVENTS: Lazy = Lazy::new(|| { .expect("Failed to register tenant_task_events metric") }); -pub(crate) static BACKGROUND_LOOP_SEMAPHORE_WAIT_START_COUNT: Lazy = - Lazy::new(|| { - register_int_counter_vec!( - "pageserver_background_loop_semaphore_wait_start_count", - "Counter for background loop concurrency-limiting semaphore acquire calls started", - &["task"], - ) - .unwrap() - }); - -pub(crate) static BACKGROUND_LOOP_SEMAPHORE_WAIT_FINISH_COUNT: Lazy = - Lazy::new(|| { - register_int_counter_vec!( - "pageserver_background_loop_semaphore_wait_finish_count", - "Counter for background loop concurrency-limiting semaphore acquire calls finished", - &["task"], - ) - .unwrap() - }); +pub(crate) static BACKGROUND_LOOP_SEMAPHORE_WAIT_GAUGE: Lazy = Lazy::new(|| { + register_int_counter_pair_vec!( + "pageserver_background_loop_semaphore_wait_start_count", + "Counter for background loop concurrency-limiting semaphore acquire calls started", + "pageserver_background_loop_semaphore_wait_finish_count", + "Counter for background loop concurrency-limiting semaphore acquire calls finished", + &["task"], + ) + .unwrap() +}); pub(crate) static BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT: Lazy = Lazy::new(|| { register_int_counter_vec!( diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 8747d9ad50..b80a498c82 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -258,6 +258,9 @@ pub enum TaskKind { /// See [`crate::disk_usage_eviction_task`]. DiskUsageEviction, + /// See [`crate::tenant::secondary`]. + SecondaryUploads, + // Initial logical size calculation InitialLogicalSizeCalculation, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index a8e8b4cbfa..969210622c 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -48,6 +48,7 @@ use self::mgr::GetActiveTenantError; use self::mgr::GetTenantError; use self::mgr::TenantsMap; use self::remote_timeline_client::RemoteTimelineClient; +use self::timeline::uninit::TimelineExclusionError; use self::timeline::uninit::TimelineUninitMark; use self::timeline::uninit::UninitializedTimeline; use self::timeline::EvictionTaskTenantState; @@ -87,7 +88,6 @@ use std::process::Stdio; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::Arc; -use std::sync::MutexGuard; use std::sync::{Mutex, RwLock}; use std::time::{Duration, Instant}; @@ -144,6 +144,7 @@ pub mod storage_layer; pub mod config; pub mod delete; pub mod mgr; +pub mod secondary; pub mod tasks; pub mod upload_queue; @@ -248,6 +249,12 @@ pub struct Tenant { generation: Generation, timelines: Mutex>>, + + /// During timeline creation, we first insert the TimelineId to the + /// creating map, then `timelines`, then remove it from the creating map. + /// **Lock order**: if acquring both, acquire`timelines` before `timelines_creating` + timelines_creating: std::sync::Mutex>, + // This mutex prevents creation of new timelines during GC. // Adding yet another mutex (in addition to `timelines`) is needed because holding // `timelines` mutex during all GC iteration @@ -406,8 +413,10 @@ impl Debug for SetStoppingError { #[derive(thiserror::Error, Debug)] pub enum CreateTimelineError { - #[error("a timeline with the given ID already exists")] - AlreadyExists, + #[error("creation of timeline with the given ID is in progress")] + AlreadyCreating, + #[error("timeline already exists with different parameters")] + Conflict, #[error(transparent)] AncestorLsn(anyhow::Error), #[error("ancestor timeline is not active")] @@ -1457,7 +1466,7 @@ impl Tenant { /// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the /// minimum amount of keys required to get a writable timeline. /// (Without it, `put` might fail due to `repartition` failing.) - pub async fn create_empty_timeline( + pub(crate) async fn create_empty_timeline( &self, new_timeline_id: TimelineId, initdb_lsn: Lsn, @@ -1469,10 +1478,7 @@ impl Tenant { "Cannot create empty timelines on inactive tenant" ); - let timeline_uninit_mark = { - let timelines = self.timelines.lock().unwrap(); - self.create_timeline_uninit_mark(new_timeline_id, &timelines)? - }; + let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id)?; let new_metadata = TimelineMetadata::new( // Initialize disk_consistent LSN to 0, The caller must import some data to // make it valid, before calling finish_creation() @@ -1549,7 +1555,7 @@ impl Tenant { /// If the caller specified the timeline ID to use (`new_timeline_id`), and timeline with /// the same timeline ID already exists, returns CreateTimelineError::AlreadyExists. #[allow(clippy::too_many_arguments)] - pub async fn create_timeline( + pub(crate) async fn create_timeline( &self, new_timeline_id: TimelineId, ancestor_timeline_id: Option, @@ -1570,26 +1576,51 @@ impl Tenant { .enter() .map_err(|_| CreateTimelineError::ShuttingDown)?; - if let Ok(existing) = self.get_timeline(new_timeline_id, false) { - debug!("timeline {new_timeline_id} already exists"); - - if let Some(remote_client) = existing.remote_client.as_ref() { - // Wait for uploads to complete, so that when we return Ok, the timeline - // is known to be durable on remote storage. Just like we do at the end of - // this function, after we have created the timeline ourselves. - // - // We only really care that the initial version of `index_part.json` has - // been uploaded. That's enough to remember that the timeline - // exists. However, there is no function to wait specifically for that so - // we just wait for all in-progress uploads to finish. - remote_client - .wait_completion() - .await - .context("wait for timeline uploads to complete")?; + // Get exclusive access to the timeline ID: this ensures that it does not already exist, + // and that no other creation attempts will be allowed in while we are working. The + // uninit_mark is a guard. + let uninit_mark = match self.create_timeline_uninit_mark(new_timeline_id) { + Ok(m) => m, + Err(TimelineExclusionError::AlreadyCreating) => { + // Creation is in progress, we cannot create it again, and we cannot + // check if this request matches the existing one, so caller must try + // again later. + return Err(CreateTimelineError::AlreadyCreating); } + Err(TimelineExclusionError::Other(e)) => { + return Err(CreateTimelineError::Other(e)); + } + Err(TimelineExclusionError::AlreadyExists(existing)) => { + debug!("timeline {new_timeline_id} already exists"); - return Err(CreateTimelineError::AlreadyExists); - } + // Idempotency: creating the same timeline twice is not an error, unless + // the second creation has different parameters. + if existing.get_ancestor_timeline_id() != ancestor_timeline_id + || existing.pg_version != pg_version + || (ancestor_start_lsn.is_some() + && ancestor_start_lsn != Some(existing.get_ancestor_lsn())) + { + return Err(CreateTimelineError::Conflict); + } + + if let Some(remote_client) = existing.remote_client.as_ref() { + // Wait for uploads to complete, so that when we return Ok, the timeline + // is known to be durable on remote storage. Just like we do at the end of + // this function, after we have created the timeline ourselves. + // + // We only really care that the initial version of `index_part.json` has + // been uploaded. That's enough to remember that the timeline + // exists. However, there is no function to wait specifically for that so + // we just wait for all in-progress uploads to finish. + remote_client + .wait_completion() + .await + .context("wait for timeline uploads to complete")?; + } + + return Ok(existing); + } + }; let loaded_timeline = match ancestor_timeline_id { Some(ancestor_timeline_id) => { @@ -1626,18 +1657,32 @@ impl Tenant { ancestor_timeline.wait_lsn(*lsn, ctx).await?; } - self.branch_timeline(&ancestor_timeline, new_timeline_id, ancestor_start_lsn, ctx) - .await? + self.branch_timeline( + &ancestor_timeline, + new_timeline_id, + ancestor_start_lsn, + uninit_mark, + ctx, + ) + .await? } None => { - self.bootstrap_timeline(new_timeline_id, pg_version, load_existing_initdb, ctx) - .await? + self.bootstrap_timeline( + new_timeline_id, + pg_version, + load_existing_initdb, + uninit_mark, + ctx, + ) + .await? } }; + // At this point we have dropped our guard on [`Self::timelines_creating`], and + // the timeline is visible in [`Self::timelines`], but it is _not_ durable yet. We must + // not send a success to the caller until it is. The same applies to handling retries, + // see the handling of [`TimelineExclusionError::AlreadyExists`] above. if let Some(remote_client) = loaded_timeline.remote_client.as_ref() { - // Wait for the upload of the 'index_part.json` file to finish, so that when we return - // Ok, the timeline is durable in remote storage. let kind = ancestor_timeline_id .map(|_| "branched") .unwrap_or("bootstrapped"); @@ -2114,6 +2159,14 @@ impl Tenant { .attach_mode .clone() } + + pub(crate) fn get_tenant_shard_id(&self) -> &TenantShardId { + &self.tenant_shard_id + } + + pub(crate) fn get_generation(&self) -> Generation { + self.generation + } } /// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id), @@ -2252,6 +2305,18 @@ impl Tenant { .or(self.conf.default_tenant_conf.min_resident_size_override) } + pub fn get_heatmap_period(&self) -> Option { + let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf; + let heatmap_period = tenant_conf + .heatmap_period + .unwrap_or(self.conf.default_tenant_conf.heatmap_period); + if heatmap_period.is_zero() { + None + } else { + Some(heatmap_period) + } + } + pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) { self.tenant_conf.write().unwrap().tenant_conf = new_tenant_conf; // Don't hold self.timelines.lock() during the notifies. @@ -2401,6 +2466,7 @@ impl Tenant { loading_started_at: Instant::now(), tenant_conf: Arc::new(RwLock::new(attached_conf)), timelines: Mutex::new(HashMap::new()), + timelines_creating: Mutex::new(HashSet::new()), gc_cs: tokio::sync::Mutex::new(()), walredo_mgr, remote_storage, @@ -2792,8 +2858,9 @@ impl Tenant { start_lsn: Option, ctx: &RequestContext, ) -> Result, CreateTimelineError> { + let uninit_mark = self.create_timeline_uninit_mark(dst_id).unwrap(); let tl = self - .branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx) + .branch_timeline_impl(src_timeline, dst_id, start_lsn, uninit_mark, ctx) .await?; tl.set_state(TimelineState::Active); Ok(tl) @@ -2807,9 +2874,10 @@ impl Tenant { src_timeline: &Arc, dst_id: TimelineId, start_lsn: Option, + timeline_uninit_mark: TimelineUninitMark<'_>, ctx: &RequestContext, ) -> Result, CreateTimelineError> { - self.branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx) + self.branch_timeline_impl(src_timeline, dst_id, start_lsn, timeline_uninit_mark, ctx) .await } @@ -2818,13 +2886,14 @@ impl Tenant { src_timeline: &Arc, dst_id: TimelineId, start_lsn: Option, + timeline_uninit_mark: TimelineUninitMark<'_>, _ctx: &RequestContext, ) -> Result, CreateTimelineError> { let src_id = src_timeline.timeline_id; - // First acquire the GC lock so that another task cannot advance the GC - // cutoff in 'gc_info', and make 'start_lsn' invalid, while we are - // creating the branch. + // We will validate our ancestor LSN in this function. Acquire the GC lock so that + // this check cannot race with GC, and the ancestor LSN is guaranteed to remain + // valid while we are creating the branch. let _gc_cs = self.gc_cs.lock().await; // If no start LSN is specified, we branch the new timeline from the source timeline's last record LSN @@ -2834,13 +2903,6 @@ impl Tenant { lsn }); - // Create a placeholder for the new branch. This will error - // out if the new timeline ID is already in use. - let timeline_uninit_mark = { - let timelines = self.timelines.lock().unwrap(); - self.create_timeline_uninit_mark(dst_id, &timelines)? - }; - // Ensure that `start_lsn` is valid, i.e. the LSN is within the PITR // horizon on the source timeline // @@ -2932,21 +2994,38 @@ impl Tenant { Ok(new_timeline) } - /// - run initdb to init temporary instance and get bootstrap data - /// - after initialization completes, tar up the temp dir and upload it to S3. - /// - /// The caller is responsible for activating the returned timeline. - pub(crate) async fn bootstrap_timeline( + /// For unit tests, make this visible so that other modules can directly create timelines + #[cfg(test)] + pub(crate) async fn bootstrap_timeline_test( &self, timeline_id: TimelineId, pg_version: u32, load_existing_initdb: Option, ctx: &RequestContext, ) -> anyhow::Result> { - let timeline_uninit_mark = { - let timelines = self.timelines.lock().unwrap(); - self.create_timeline_uninit_mark(timeline_id, &timelines)? - }; + let uninit_mark = self.create_timeline_uninit_mark(timeline_id).unwrap(); + self.bootstrap_timeline( + timeline_id, + pg_version, + load_existing_initdb, + uninit_mark, + ctx, + ) + .await + } + + /// - run initdb to init temporary instance and get bootstrap data + /// - after initialization completes, tar up the temp dir and upload it to S3. + /// + /// The caller is responsible for activating the returned timeline. + async fn bootstrap_timeline( + &self, + timeline_id: TimelineId, + pg_version: u32, + load_existing_initdb: Option, + timeline_uninit_mark: TimelineUninitMark<'_>, + ctx: &RequestContext, + ) -> anyhow::Result> { // create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/` // temporary directory for basebackup files for the given timeline. @@ -3143,11 +3222,11 @@ impl Tenant { /// at 'disk_consistent_lsn'. After any initial data has been imported, call /// `finish_creation` to insert the Timeline into the timelines map and to remove the /// uninit mark file. - async fn prepare_new_timeline( - &self, + async fn prepare_new_timeline<'a>( + &'a self, new_timeline_id: TimelineId, new_metadata: &TimelineMetadata, - uninit_mark: TimelineUninitMark, + uninit_mark: TimelineUninitMark<'a>, start_lsn: Lsn, ancestor: Option>, ) -> anyhow::Result { @@ -3220,23 +3299,38 @@ impl Tenant { fn create_timeline_uninit_mark( &self, timeline_id: TimelineId, - timelines: &MutexGuard>>, - ) -> anyhow::Result { + ) -> Result { let tenant_shard_id = self.tenant_shard_id; - anyhow::ensure!( - timelines.get(&timeline_id).is_none(), - "Timeline {tenant_shard_id}/{timeline_id} already exists in pageserver's memory" - ); - let timeline_path = self.conf.timeline_path(&tenant_shard_id, &timeline_id); - anyhow::ensure!( - !timeline_path.exists(), - "Timeline {timeline_path} already exists, cannot create its uninit mark file", - ); - let uninit_mark_path = self .conf .timeline_uninit_mark_file_path(tenant_shard_id, timeline_id); + let timeline_path = self.conf.timeline_path(&tenant_shard_id, &timeline_id); + + let uninit_mark = TimelineUninitMark::new( + self, + timeline_id, + uninit_mark_path.clone(), + timeline_path.clone(), + )?; + + // At this stage, we have got exclusive access to in-memory state for this timeline ID + // for creation. + // A timeline directory should never exist on disk already: + // - a previous failed creation would have cleaned up after itself + // - a pageserver restart would clean up timeline directories that don't have valid remote state + // + // Therefore it is an unexpected internal error to encounter a timeline directory already existing here, + // this error may indicate a bug in cleanup on failed creations. + if timeline_path.exists() { + return Err(TimelineExclusionError::Other(anyhow::anyhow!( + "Timeline directory already exists! This is a bug." + ))); + } + + // Create the on-disk uninit mark _after_ the in-memory acquisition of the tenant ID: guarantees + // that during process runtime, colliding creations will be caught in-memory without getting + // as far as failing to write a file. fs::OpenOptions::new() .write(true) .create_new(true) @@ -3250,8 +3344,6 @@ impl Tenant { format!("Failed to crate uninit mark for timeline {tenant_shard_id}/{timeline_id}") })?; - let uninit_mark = TimelineUninitMark::new(uninit_mark_path, timeline_path); - Ok(uninit_mark) } @@ -3694,6 +3786,7 @@ pub(crate) mod harness { tenant_conf.evictions_low_residence_duration_metric_threshold, ), gc_feedback: Some(tenant_conf.gc_feedback), + heatmap_period: Some(tenant_conf.heatmap_period), } } } @@ -4000,13 +4093,7 @@ mod tests { .await { Ok(_) => panic!("duplicate timeline creation should fail"), - Err(e) => assert_eq!( - e.to_string(), - format!( - "Timeline {}/{} already exists in pageserver's memory", - tenant.tenant_shard_id, TIMELINE_ID - ) - ), + Err(e) => assert_eq!(e.to_string(), "Already exists".to_string()), } Ok(()) diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index 7a454b53d2..25d97f51ce 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -334,6 +334,11 @@ pub struct TenantConf { #[serde(with = "humantime_serde")] pub evictions_low_residence_duration_metric_threshold: Duration, pub gc_feedback: bool, + + /// If non-zero, the period between uploads of a heatmap from attached tenants. This + /// may be disabled if a Tenant will not have secondary locations: only secondary + /// locations will use the heatmap uploaded by attached locations. + pub heatmap_period: Duration, } /// Same as TenantConf, but this struct preserves the information about @@ -414,6 +419,11 @@ pub struct TenantConfOpt { #[serde(skip_serializing_if = "Option::is_none")] #[serde(default)] pub gc_feedback: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(with = "humantime_serde")] + #[serde(default)] + pub heatmap_period: Option, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] @@ -482,6 +492,7 @@ impl TenantConfOpt { .evictions_low_residence_duration_metric_threshold .unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold), gc_feedback: self.gc_feedback.unwrap_or(global_conf.gc_feedback), + heatmap_period: self.heatmap_period.unwrap_or(global_conf.heatmap_period), } } } @@ -519,6 +530,7 @@ impl Default for TenantConf { ) .expect("cannot parse default evictions_low_residence_duration_metric_threshold"), gc_feedback: false, + heatmap_period: Duration::ZERO, } } } diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 4d7bd4259f..f53951e1d3 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -807,6 +807,12 @@ pub(crate) async fn set_new_tenant_config( } impl TenantManager { + /// Convenience function so that anyone with a TenantManager can get at the global configuration, without + /// having to pass it around everywhere as a separate object. + pub(crate) fn get_conf(&self) -> &'static PageServerConf { + self.conf + } + /// Gets the attached tenant from the in-memory data, erroring if it's absent, in secondary mode, or is not fitting to the query. /// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants. pub(crate) fn get_attached_tenant_shard( @@ -1087,6 +1093,20 @@ impl TenantManager { Ok(()) } + + pub(crate) fn get_attached_active_tenant_shards(&self) -> Vec> { + let locked = self.tenants.read().unwrap(); + match &*locked { + TenantsMap::Initializing => Vec::new(), + TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => map + .values() + .filter_map(|slot| { + slot.get_attached() + .and_then(|t| if t.is_active() { Some(t.clone()) } else { None }) + }) + .collect(), + } + } } #[derive(Debug, thiserror::Error)] diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 3765ff6e7a..4b271a7395 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -180,7 +180,7 @@ //! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync //! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map -mod download; +pub(crate) mod download; pub mod index; mod upload; @@ -1604,6 +1604,23 @@ impl RemoteTimelineClient { } } } + + pub(crate) fn get_layers_metadata( + &self, + layers: Vec, + ) -> anyhow::Result>> { + let q = self.upload_queue.lock().unwrap(); + let q = match &*q { + UploadQueue::Stopped(_) | UploadQueue::Uninitialized => { + anyhow::bail!("queue is in state {}", q.as_str()) + } + UploadQueue::Initialized(inner) => inner, + }; + + let decorated = layers.into_iter().map(|l| q.latest_files.get(&l).cloned()); + + Ok(decorated.collect()) + } } pub fn remote_timelines_path(tenant_shard_id: &TenantShardId) -> RemotePath { @@ -1659,6 +1676,13 @@ pub fn remote_index_path( .expect("Failed to construct path") } +pub const HEATMAP_BASENAME: &str = "heatmap-v1.json"; + +pub(crate) fn remote_heatmap_path(tenant_shard_id: &TenantShardId) -> RemotePath { + RemotePath::from_string(&format!("tenants/{tenant_shard_id}/{HEATMAP_BASENAME}")) + .expect("Failed to construct path") +} + /// Given the key of an index, parse out the generation part of the name pub fn parse_remote_index_path(path: RemotePath) -> Option { let file_name = match path.get_path().file_name() { diff --git a/pageserver/src/tenant/secondary.rs b/pageserver/src/tenant/secondary.rs new file mode 100644 index 0000000000..d25fe56b92 --- /dev/null +++ b/pageserver/src/tenant/secondary.rs @@ -0,0 +1,104 @@ +pub mod heatmap; +mod heatmap_uploader; + +use std::sync::Arc; + +use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}; + +use self::heatmap_uploader::heatmap_uploader_task; + +use super::mgr::TenantManager; + +use pageserver_api::shard::TenantShardId; +use remote_storage::GenericRemoteStorage; + +use tokio_util::sync::CancellationToken; +use utils::completion::Barrier; + +enum UploadCommand { + Upload(TenantShardId), +} + +struct CommandRequest { + payload: T, + response_tx: tokio::sync::oneshot::Sender, +} + +struct CommandResponse { + result: anyhow::Result<()>, +} + +/// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads, +/// and heatmap uploads. This is not a hot data path: it's primarily a hook for tests, +/// where we want to immediately upload/download for a particular tenant. In normal operation +/// uploads & downloads are autonomous and not driven by this interface. +pub struct SecondaryController { + upload_req_tx: tokio::sync::mpsc::Sender>, +} + +impl SecondaryController { + async fn dispatch( + &self, + queue: &tokio::sync::mpsc::Sender>, + payload: T, + ) -> anyhow::Result<()> { + let (response_tx, response_rx) = tokio::sync::oneshot::channel(); + + queue + .send(CommandRequest { + payload, + response_tx, + }) + .await + .map_err(|_| anyhow::anyhow!("Receiver shut down"))?; + + let response = response_rx + .await + .map_err(|_| anyhow::anyhow!("Request dropped"))?; + + response.result + } + + pub async fn upload_tenant(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> { + self.dispatch(&self.upload_req_tx, UploadCommand::Upload(tenant_shard_id)) + .await + } +} + +pub fn spawn_tasks( + tenant_manager: Arc, + remote_storage: GenericRemoteStorage, + background_jobs_can_start: Barrier, + cancel: CancellationToken, +) -> SecondaryController { + let (upload_req_tx, upload_req_rx) = + tokio::sync::mpsc::channel::>(16); + + task_mgr::spawn( + BACKGROUND_RUNTIME.handle(), + TaskKind::SecondaryUploads, + None, + None, + "heatmap uploads", + false, + async move { + heatmap_uploader_task( + tenant_manager, + remote_storage, + upload_req_rx, + background_jobs_can_start, + cancel, + ) + .await + }, + ); + + SecondaryController { upload_req_tx } +} + +/// For running with remote storage disabled: a SecondaryController that is connected to nothing. +pub fn null_controller() -> SecondaryController { + let (upload_req_tx, _upload_req_rx) = + tokio::sync::mpsc::channel::>(16); + SecondaryController { upload_req_tx } +} diff --git a/pageserver/src/tenant/secondary/heatmap.rs b/pageserver/src/tenant/secondary/heatmap.rs new file mode 100644 index 0000000000..99aaaeb8c8 --- /dev/null +++ b/pageserver/src/tenant/secondary/heatmap.rs @@ -0,0 +1,64 @@ +use std::time::SystemTime; + +use crate::tenant::{ + remote_timeline_client::index::IndexLayerMetadata, storage_layer::LayerFileName, +}; + +use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DisplayFromStr, TimestampSeconds}; + +use utils::{generation::Generation, id::TimelineId}; + +#[derive(Serialize, Deserialize)] +pub(super) struct HeatMapTenant { + /// Generation of the attached location that uploaded the heatmap: this is not required + /// for correctness, but acts as a hint to secondary locations in order to detect thrashing + /// in the unlikely event that two attached locations are both uploading conflicting heatmaps. + pub(super) generation: Generation, + + pub(super) timelines: Vec, +} + +#[serde_as] +#[derive(Serialize, Deserialize)] +pub(crate) struct HeatMapTimeline { + #[serde_as(as = "DisplayFromStr")] + pub(super) timeline_id: TimelineId, + + pub(super) layers: Vec, +} + +#[serde_as] +#[derive(Serialize, Deserialize)] +pub(crate) struct HeatMapLayer { + pub(super) name: LayerFileName, + pub(super) metadata: IndexLayerMetadata, + + #[serde_as(as = "TimestampSeconds")] + pub(super) access_time: SystemTime, + // TODO: an actual 'heat' score that would let secondary locations prioritize downloading + // the hottest layers, rather than trying to simply mirror whatever layers are on-disk on the primary. +} + +impl HeatMapLayer { + pub(crate) fn new( + name: LayerFileName, + metadata: IndexLayerMetadata, + access_time: SystemTime, + ) -> Self { + Self { + name, + metadata, + access_time, + } + } +} + +impl HeatMapTimeline { + pub(crate) fn new(timeline_id: TimelineId, layers: Vec) -> Self { + Self { + timeline_id, + layers, + } + } +} diff --git a/pageserver/src/tenant/secondary/heatmap_uploader.rs b/pageserver/src/tenant/secondary/heatmap_uploader.rs new file mode 100644 index 0000000000..ece2b93ce1 --- /dev/null +++ b/pageserver/src/tenant/secondary/heatmap_uploader.rs @@ -0,0 +1,582 @@ +use std::{ + collections::HashMap, + sync::{Arc, Weak}, + time::{Duration, Instant}, +}; + +use crate::{ + metrics::SECONDARY_MODE, + tenant::{ + config::AttachmentMode, mgr::TenantManager, remote_timeline_client::remote_heatmap_path, + secondary::CommandResponse, span::debug_assert_current_span_has_tenant_id, Tenant, + }, +}; + +use md5; +use pageserver_api::shard::TenantShardId; +use remote_storage::GenericRemoteStorage; + +use tokio::task::JoinSet; +use tokio_util::sync::CancellationToken; +use tracing::instrument; +use utils::{backoff, completion::Barrier}; + +use super::{heatmap::HeatMapTenant, CommandRequest, UploadCommand}; + +/// Period between heatmap uploader walking Tenants to look for work to do. +/// If any tenants have a heatmap upload period lower than this, it will be adjusted +/// downward to match. +const DEFAULT_SCHEDULING_INTERVAL: Duration = Duration::from_millis(60000); +const MIN_SCHEDULING_INTERVAL: Duration = Duration::from_millis(1000); + +struct WriteInProgress { + barrier: Barrier, +} + +struct UploadPending { + tenant: Arc, + last_digest: Option, +} + +struct WriteComplete { + tenant_shard_id: TenantShardId, + completed_at: Instant, + digest: Option, + next_upload: Option, +} + +/// The heatmap uploader keeps a little bit of per-tenant state, mainly to remember +/// when we last did a write. We only populate this after doing at least one +/// write for a tenant -- this avoids holding state for tenants that have +/// uploads disabled. + +struct UploaderTenantState { + // This Weak only exists to enable culling idle instances of this type + // when the Tenant has been deallocated. + tenant: Weak, + + /// Digest of the serialized heatmap that we last successfully uploaded + /// + /// md5 is generally a bad hash. We use it because it's convenient for interop with AWS S3's ETag, + /// which is also an md5sum. + last_digest: Option, + + /// When the last upload attempt completed (may have been successful or failed) + last_upload: Option, + + /// When should we next do an upload? None means never. + next_upload: Option, +} + +/// This type is owned by a single task ([`heatmap_uploader_task`]) which runs an event +/// handling loop and mutates it as needed: there are no locks here, because that event loop +/// can hold &mut references to this type throughout. +struct HeatmapUploader { + tenant_manager: Arc, + remote_storage: GenericRemoteStorage, + cancel: CancellationToken, + + tenants: HashMap, + + /// Tenants with work to do, for which tasks should be spawned as soon as concurrency + /// limits permit it. + tenants_pending: std::collections::VecDeque, + + /// Tenants for which a task in `tasks` has been spawned. + tenants_uploading: HashMap, + + tasks: JoinSet<()>, + + /// Channel for our child tasks to send results to: we use a channel for results rather than + /// just getting task results via JoinSet because we need the channel's recv() "sleep until something + /// is available" semantic, rather than JoinSet::join_next()'s "sleep until next thing is available _or_ I'm empty" + /// behavior. + task_result_tx: tokio::sync::mpsc::UnboundedSender, + task_result_rx: tokio::sync::mpsc::UnboundedReceiver, + + concurrent_uploads: usize, + + scheduling_interval: Duration, +} + +/// The uploader task runs a loop that periodically wakes up and schedules tasks for +/// tenants that require an upload, or handles any commands that have been sent into +/// `command_queue`. No I/O is done in this loop: that all happens in the tasks we +/// spawn. +/// +/// Scheduling iterations are somewhat infrequent. However, each one will enqueue +/// all tenants that require an upload, and in between scheduling iterations we will +/// continue to spawn new tasks for pending tenants, as our concurrency limit permits. +/// +/// While we take a CancellationToken here, it is subordinate to the CancellationTokens +/// of tenants: i.e. we expect all Tenants to have been shut down before we are shut down, otherwise +/// we might block waiting on a Tenant. +pub(super) async fn heatmap_uploader_task( + tenant_manager: Arc, + remote_storage: GenericRemoteStorage, + mut command_queue: tokio::sync::mpsc::Receiver>, + background_jobs_can_start: Barrier, + cancel: CancellationToken, +) -> anyhow::Result<()> { + let concurrent_uploads = tenant_manager.get_conf().heatmap_upload_concurrency; + + let (result_tx, result_rx) = tokio::sync::mpsc::unbounded_channel(); + + let mut uploader = HeatmapUploader { + tenant_manager, + remote_storage, + cancel: cancel.clone(), + tasks: JoinSet::new(), + tenants: HashMap::new(), + tenants_pending: std::collections::VecDeque::new(), + tenants_uploading: HashMap::new(), + task_result_tx: result_tx, + task_result_rx: result_rx, + concurrent_uploads, + scheduling_interval: DEFAULT_SCHEDULING_INTERVAL, + }; + + tracing::info!("Waiting for background_jobs_can start..."); + background_jobs_can_start.wait().await; + tracing::info!("background_jobs_can is ready, proceeding."); + + while !cancel.is_cancelled() { + // Look for new work: this is relatively expensive because we have to go acquire the lock on + // the tenant manager to retrieve tenants, and then iterate over them to figure out which ones + // require an upload. + uploader.schedule_iteration().await?; + + // Between scheduling iterations, we will: + // - Drain any complete tasks and spawn pending tasks + // - Handle incoming administrative commands + // - Check our cancellation token + let next_scheduling_iteration = Instant::now() + .checked_add(uploader.scheduling_interval) + .unwrap_or_else(|| { + tracing::warn!( + "Scheduling interval invalid ({}s), running immediately!", + uploader.scheduling_interval.as_secs_f64() + ); + Instant::now() + }); + loop { + tokio::select! { + _ = cancel.cancelled() => { + // We do not simply drop the JoinSet, in order to have an orderly shutdown without cancellation. + tracing::info!("Heatmap uploader joining tasks"); + while let Some(_r) = uploader.tasks.join_next().await {}; + tracing::info!("Heatmap uploader terminating"); + + break; + }, + _ = tokio::time::sleep(next_scheduling_iteration.duration_since(Instant::now())) => { + tracing::debug!("heatmap_uploader_task: woke for scheduling interval"); + break;}, + cmd = command_queue.recv() => { + tracing::debug!("heatmap_uploader_task: woke for command queue"); + let cmd = match cmd { + Some(c) =>c, + None => { + // SecondaryController was destroyed, and this has raced with + // our CancellationToken + tracing::info!("Heatmap uploader terminating"); + cancel.cancel(); + break; + } + }; + + let CommandRequest{ + response_tx, + payload + } = cmd; + uploader.handle_command(payload, response_tx); + }, + _ = uploader.process_next_completion() => { + if !cancel.is_cancelled() { + uploader.spawn_pending(); + } + } + } + } + } + + Ok(()) +} + +impl HeatmapUploader { + /// Periodic execution phase: inspect all attached tenants and schedule any work they require. + async fn schedule_iteration(&mut self) -> anyhow::Result<()> { + // Cull any entries in self.tenants whose Arc is gone + self.tenants + .retain(|_k, v| v.tenant.upgrade().is_some() && v.next_upload.is_some()); + + // The priority order of previously scheduled work may be invalidated by current state: drop + // all pending work (it will be re-scheduled if still needed) + self.tenants_pending.clear(); + + // Used a fixed 'now' through the following loop, for efficiency and fairness. + let now = Instant::now(); + + // While iterating over the potentially-long list of tenants, we will periodically yield + // to avoid blocking executor. + const YIELD_ITERATIONS: usize = 1000; + + // Iterate over tenants looking for work to do. + let tenants = self.tenant_manager.get_attached_active_tenant_shards(); + for (i, tenant) in tenants.into_iter().enumerate() { + // Process is shutting down, drop out + if self.cancel.is_cancelled() { + return Ok(()); + } + + // Skip tenants that already have a write in flight + if self + .tenants_uploading + .contains_key(tenant.get_tenant_shard_id()) + { + continue; + } + + self.maybe_schedule_upload(&now, tenant); + + if i + 1 % YIELD_ITERATIONS == 0 { + tokio::task::yield_now().await; + } + } + + // Spawn tasks for as many of our pending tenants as we can. + self.spawn_pending(); + + Ok(()) + } + + /// + /// Cancellation: this method is cancel-safe. + async fn process_next_completion(&mut self) { + match self.task_result_rx.recv().await { + Some(r) => { + self.on_completion(r); + } + None => { + unreachable!("Result sender is stored on Self"); + } + } + } + + /// The 'maybe' refers to the tenant's state: whether it is configured + /// for heatmap uploads at all, and whether sufficient time has passed + /// since the last upload. + fn maybe_schedule_upload(&mut self, now: &Instant, tenant: Arc) { + match tenant.get_heatmap_period() { + None => { + // Heatmaps are disabled for this tenant + return; + } + Some(period) => { + // If any tenant has asked for uploads more frequent than our scheduling interval, + // reduce it to match so that we can keep up. This is mainly useful in testing, where + // we may set rather short intervals. + if period < self.scheduling_interval { + self.scheduling_interval = std::cmp::max(period, MIN_SCHEDULING_INTERVAL); + } + } + } + + // Stale attachments do not upload anything: if we are in this state, there is probably some + // other attachment in mode Single or Multi running on another pageserver, and we don't + // want to thrash and overwrite their heatmap uploads. + if tenant.get_attach_mode() == AttachmentMode::Stale { + return; + } + + // Create an entry in self.tenants if one doesn't already exist: this will later be updated + // with the completion time in on_completion. + let state = self + .tenants + .entry(*tenant.get_tenant_shard_id()) + .or_insert_with(|| UploaderTenantState { + tenant: Arc::downgrade(&tenant), + last_upload: None, + next_upload: Some(Instant::now()), + last_digest: None, + }); + + // Decline to do the upload if insufficient time has passed + if state.next_upload.map(|nu| &nu > now).unwrap_or(false) { + return; + } + + let last_digest = state.last_digest; + self.tenants_pending.push_back(UploadPending { + tenant, + last_digest, + }) + } + + fn spawn_pending(&mut self) { + while !self.tenants_pending.is_empty() + && self.tenants_uploading.len() < self.concurrent_uploads + { + // unwrap: loop condition includes !is_empty() + let pending = self.tenants_pending.pop_front().unwrap(); + self.spawn_upload(pending.tenant, pending.last_digest); + } + } + + fn spawn_upload(&mut self, tenant: Arc, last_digest: Option) { + let remote_storage = self.remote_storage.clone(); + let tenant_shard_id = *tenant.get_tenant_shard_id(); + let (completion, barrier) = utils::completion::channel(); + let result_tx = self.task_result_tx.clone(); + self.tasks.spawn(async move { + // Guard for the barrier in [`WriteInProgress`] + let _completion = completion; + + let started_at = Instant::now(); + let digest = match upload_tenant_heatmap(remote_storage, &tenant, last_digest).await { + Ok(UploadHeatmapOutcome::Uploaded(digest)) => { + let duration = Instant::now().duration_since(started_at); + SECONDARY_MODE + .upload_heatmap_duration + .observe(duration.as_secs_f64()); + SECONDARY_MODE.upload_heatmap.inc(); + Some(digest) + } + Ok(UploadHeatmapOutcome::NoChange | UploadHeatmapOutcome::Skipped) => last_digest, + Err(UploadHeatmapError::Upload(e)) => { + tracing::warn!( + "Failed to upload heatmap for tenant {}: {e:#}", + tenant.get_tenant_shard_id(), + ); + let duration = Instant::now().duration_since(started_at); + SECONDARY_MODE + .upload_heatmap_duration + .observe(duration.as_secs_f64()); + SECONDARY_MODE.upload_heatmap_errors.inc(); + last_digest + } + Err(UploadHeatmapError::Cancelled) => { + tracing::info!("Cancelled heatmap upload, shutting down"); + last_digest + } + }; + + let now = Instant::now(); + let next_upload = tenant + .get_heatmap_period() + .and_then(|period| now.checked_add(period)); + + result_tx + .send(WriteComplete { + tenant_shard_id: *tenant.get_tenant_shard_id(), + completed_at: now, + digest, + next_upload, + }) + .ok(); + }); + + self.tenants_uploading + .insert(tenant_shard_id, WriteInProgress { barrier }); + } + + #[instrument(skip_all, fields(tenant_id=%completion.tenant_shard_id.tenant_id, shard_id=%completion.tenant_shard_id.shard_slug()))] + fn on_completion(&mut self, completion: WriteComplete) { + tracing::debug!("Heatmap upload completed"); + let WriteComplete { + tenant_shard_id, + completed_at, + digest, + next_upload, + } = completion; + self.tenants_uploading.remove(&tenant_shard_id); + use std::collections::hash_map::Entry; + match self.tenants.entry(tenant_shard_id) { + Entry::Vacant(_) => { + // Tenant state was dropped, nothing to update. + } + Entry::Occupied(mut entry) => { + entry.get_mut().last_upload = Some(completed_at); + entry.get_mut().last_digest = digest; + entry.get_mut().next_upload = next_upload + } + } + } + + fn handle_command( + &mut self, + command: UploadCommand, + response_tx: tokio::sync::oneshot::Sender, + ) { + match command { + UploadCommand::Upload(tenant_shard_id) => { + // If an upload was ongoing for this tenant, let it finish first. + let barrier = if let Some(writing_state) = + self.tenants_uploading.get(&tenant_shard_id) + { + tracing::info!( + tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), + "Waiting for heatmap write to complete"); + writing_state.barrier.clone() + } else { + // Spawn the upload then immediately wait for it. This will block processing of other commands and + // starting of other background work. + tracing::info!( + tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), + "Starting heatmap write on command"); + let tenant = match self + .tenant_manager + .get_attached_tenant_shard(tenant_shard_id, true) + { + Ok(t) => t, + Err(e) => { + // Drop result of send: we don't care if caller dropped their receiver + drop(response_tx.send(CommandResponse { + result: Err(e.into()), + })); + return; + } + }; + self.spawn_upload(tenant, None); + let writing_state = self + .tenants_uploading + .get(&tenant_shard_id) + .expect("We just inserted this"); + tracing::info!( + tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), + "Waiting for heatmap upload to complete"); + + writing_state.barrier.clone() + }; + + // This task does no I/O: it only listens for a barrier's completion and then + // sends to the command response channel. It is therefore safe to spawn this without + // any gates/task_mgr hooks. + tokio::task::spawn(async move { + barrier.wait().await; + + tracing::info!( + tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), + "Heatmap upload complete"); + + // Drop result of send: we don't care if caller dropped their receiver + drop(response_tx.send(CommandResponse { result: Ok(()) })) + }); + } + } + } +} + +enum UploadHeatmapOutcome { + /// We successfully wrote to remote storage, with this digest. + Uploaded(md5::Digest), + /// We did not upload because the heatmap digest was unchanged since the last upload + NoChange, + /// We skipped the upload for some reason, such as tenant/timeline not ready + Skipped, +} + +#[derive(thiserror::Error, Debug)] +enum UploadHeatmapError { + #[error("Cancelled")] + Cancelled, + + #[error(transparent)] + Upload(#[from] anyhow::Error), +} + +/// The inner upload operation. This will skip if `last_digest` is Some and matches the digest +/// of the object we would have uploaded. +#[instrument(skip_all, fields(tenant_id = %tenant.get_tenant_shard_id().tenant_id, shard_id = %tenant.get_tenant_shard_id().shard_slug()))] +async fn upload_tenant_heatmap( + remote_storage: GenericRemoteStorage, + tenant: &Arc, + last_digest: Option, +) -> Result { + debug_assert_current_span_has_tenant_id(); + + let generation = tenant.get_generation(); + if generation.is_none() { + // We do not expect this: generations were implemented before heatmap uploads. However, + // handle it so that we don't have to make the generation in the heatmap an Option<> + // (Generation::none is not serializable) + tracing::warn!("Skipping heatmap upload for tenant with generation==None"); + return Ok(UploadHeatmapOutcome::Skipped); + } + + let mut heatmap = HeatMapTenant { + timelines: Vec::new(), + generation, + }; + let timelines = tenant.timelines.lock().unwrap().clone(); + + let tenant_cancel = tenant.cancel.clone(); + + // Ensure that Tenant::shutdown waits for any upload in flight: this is needed because otherwise + // when we delete a tenant, we might race with an upload in flight and end up leaving a heatmap behind + // in remote storage. + let _guard = match tenant.gate.enter() { + Ok(g) => g, + Err(_) => { + tracing::info!("Skipping heatmap upload for tenant which is shutting down"); + return Err(UploadHeatmapError::Cancelled); + } + }; + + for (timeline_id, timeline) in timelines { + let heatmap_timeline = timeline.generate_heatmap().await; + match heatmap_timeline { + None => { + tracing::debug!( + "Skipping heatmap upload because timeline {timeline_id} is not ready" + ); + return Ok(UploadHeatmapOutcome::Skipped); + } + Some(heatmap_timeline) => { + heatmap.timelines.push(heatmap_timeline); + } + } + } + + // Serialize the heatmap + let bytes = serde_json::to_vec(&heatmap).map_err(|e| anyhow::anyhow!(e))?; + let size = bytes.len(); + + // Drop out early if nothing changed since our last upload + let digest = md5::compute(&bytes); + if Some(digest) == last_digest { + return Ok(UploadHeatmapOutcome::NoChange); + } + + let path = remote_heatmap_path(tenant.get_tenant_shard_id()); + + // Write the heatmap. + tracing::debug!("Uploading {size} byte heatmap to {path}"); + if let Err(e) = backoff::retry( + || async { + let bytes = futures::stream::once(futures::future::ready(Ok(bytes::Bytes::from( + bytes.clone(), + )))); + remote_storage + .upload_storage_object(bytes, size, &path) + .await + }, + |_| false, + 3, + u32::MAX, + "Uploading heatmap", + backoff::Cancel::new(tenant_cancel.clone(), || anyhow::anyhow!("Shutting down")), + ) + .await + { + if tenant_cancel.is_cancelled() { + return Err(UploadHeatmapError::Cancelled); + } else { + return Err(e.into()); + } + } + + tracing::info!("Successfully uploaded {size} byte heatmap to {path}"); + + Ok(UploadHeatmapOutcome::Uploaded(digest)) +} diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 112128ead8..69a2893456 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -457,6 +457,8 @@ struct LayerInner { /// For loaded layers, this may be some other value if the tenant has undergone /// a shard split since the layer was originally written. shard: ShardIndex, + + last_evicted_at: std::sync::Mutex>, } impl std::fmt::Display for LayerInner { @@ -587,6 +589,7 @@ impl LayerInner { consecutive_failures: AtomicUsize::new(0), generation, shard, + last_evicted_at: std::sync::Mutex::default(), } } @@ -722,6 +725,14 @@ impl LayerInner { permit }; + let since_last_eviction = + self.last_evicted_at.lock().unwrap().map(|ts| ts.elapsed()); + if let Some(since_last_eviction) = since_last_eviction { + // FIXME: this will not always be recorded correctly until #6028 (the no + // download needed branch above) + LAYER_IMPL_METRICS.record_redownloaded_after(since_last_eviction); + } + let res = Arc::new(DownloadedLayer { owner: Arc::downgrade(self), kind: tokio::sync::OnceCell::default(), @@ -1117,6 +1128,8 @@ impl LayerInner { // we are still holding the permit, so no new spawn_download_and_wait can happen drop(self.status.send(Status::Evicted)); + *self.last_evicted_at.lock().unwrap() = Some(std::time::Instant::now()); + res } @@ -1421,6 +1434,7 @@ pub(crate) struct LayerImplMetrics { rare_counters: enum_map::EnumMap, inits_cancelled: metrics::core::GenericCounter, + redownload_after: metrics::Histogram, } impl Default for LayerImplMetrics { @@ -1496,6 +1510,26 @@ impl Default for LayerImplMetrics { ) .unwrap(); + let redownload_after = { + let minute = 60.0; + let hour = 60.0 * minute; + metrics::register_histogram!( + "pageserver_layer_redownloaded_after", + "Time between evicting and re-downloading.", + vec![ + 10.0, + 30.0, + minute, + 5.0 * minute, + 15.0 * minute, + 30.0 * minute, + hour, + 12.0 * hour, + ] + ) + .unwrap() + }; + Self { started_evictions, completed_evictions, @@ -1507,6 +1541,7 @@ impl Default for LayerImplMetrics { rare_counters, inits_cancelled, + redownload_after, } } } @@ -1574,6 +1609,10 @@ impl LayerImplMetrics { fn inc_init_cancelled(&self) { self.inits_cancelled.inc() } + + fn record_redownloaded_after(&self, duration: std::time::Duration) { + self.redownload_after.observe(duration.as_secs_f64()) + } } #[derive(enum_map::Enum)] diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index dc23030218..4b118442f4 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -63,12 +63,10 @@ pub(crate) async fn concurrent_background_tasks_rate_limit( _ctx: &RequestContext, cancel: &CancellationToken, ) -> Result { - crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_START_COUNT + let _guard = crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_GAUGE .with_label_values(&[loop_kind.as_static_str()]) - .inc(); - scopeguard::defer!( - crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_FINISH_COUNT.with_label_values(&[loop_kind.as_static_str()]).inc(); - ); + .guard(); + tokio::select! { permit = CONCURRENT_BACKGROUND_TASKS.acquire() => { match permit { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 81dbc04793..ac1922ccad 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -29,7 +29,7 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; use tracing::*; -use utils::{id::TenantTimelineId, sync::gate::Gate}; +use utils::sync::gate::Gate; use std::collections::{BinaryHeap, HashMap, HashSet}; use std::ops::{Deref, Range}; @@ -98,8 +98,9 @@ use self::logical_size::LogicalSize; use self::walreceiver::{WalReceiver, WalReceiverConf}; use super::config::TenantConf; -use super::remote_timeline_client::index::IndexPart; +use super::remote_timeline_client::index::{IndexLayerMetadata, IndexPart}; use super::remote_timeline_client::RemoteTimelineClient; +use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline}; use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf}; #[derive(Debug, PartialEq, Eq, Clone, Copy)] @@ -377,9 +378,6 @@ pub enum PageReconstructError { #[error(transparent)] Other(#[from] anyhow::Error), - /// The operation would require downloading a layer that is missing locally. - NeedsDownload(TenantTimelineId, LayerFileName), - /// The operation was cancelled Cancelled, @@ -408,14 +406,6 @@ impl std::fmt::Debug for PageReconstructError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { match self { Self::Other(err) => err.fmt(f), - Self::NeedsDownload(tenant_timeline_id, layer_file_name) => { - write!( - f, - "layer {}/{} needs download", - tenant_timeline_id, - layer_file_name.file_name() - ) - } Self::Cancelled => write!(f, "cancelled"), Self::AncestorStopping(timeline_id) => { write!(f, "ancestor timeline {timeline_id} is being stopped") @@ -429,14 +419,6 @@ impl std::fmt::Display for PageReconstructError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { match self { Self::Other(err) => err.fmt(f), - Self::NeedsDownload(tenant_timeline_id, layer_file_name) => { - write!( - f, - "layer {}/{} needs download", - tenant_timeline_id, - layer_file_name.file_name() - ) - } Self::Cancelled => write!(f, "cancelled"), Self::AncestorStopping(timeline_id) => { write!(f, "ancestor timeline {timeline_id} is being stopped") @@ -464,6 +446,12 @@ pub(crate) enum CompactFlags { ForceRepartition, } +impl std::fmt::Debug for Timeline { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "Timeline<{}>", self.timeline_id) + } +} + /// Public interface functions impl Timeline { /// Get the LSN where this branch was created @@ -1118,8 +1106,9 @@ impl Timeline { Ok(Some(true)) } - /// Like [`evict_layer_batch`](Self::evict_layer_batch), but for just one layer. - /// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`. + /// Evict just one layer. + /// + /// Returns `Ok(None)` in the case where the layer could not be found by its `layer_file_name`. pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result> { let _gate = self .gate @@ -1130,109 +1119,17 @@ impl Timeline { return Ok(None); }; - let Some(local_layer) = local_layer.keep_resident().await? else { - return Ok(Some(false)); - }; - - let local_layer: Layer = local_layer.into(); - - let remote_client = self + let rtc = self .remote_client .as_ref() .ok_or_else(|| anyhow::anyhow!("remote storage not configured; cannot evict"))?; - let results = self - .evict_layer_batch(remote_client, &[local_layer]) - .await?; - assert_eq!(results.len(), 1); - let result: Option> = results.into_iter().next().unwrap(); - match result { - None => anyhow::bail!("task_mgr shutdown requested"), - Some(Ok(())) => Ok(Some(true)), - Some(Err(e)) => Err(anyhow::Error::new(e)), + match local_layer.evict_and_wait(rtc).await { + Ok(()) => Ok(Some(true)), + Err(EvictionError::NotFound) => Ok(Some(false)), + Err(EvictionError::Downloaded) => Ok(Some(false)), } } - - /// Evict a batch of layers. - pub(crate) async fn evict_layers( - &self, - layers_to_evict: &[Layer], - ) -> anyhow::Result>>> { - let _gate = self - .gate - .enter() - .map_err(|_| anyhow::anyhow!("Shutting down"))?; - - let remote_client = self - .remote_client - .as_ref() - .context("timeline must have RemoteTimelineClient")?; - - self.evict_layer_batch(remote_client, layers_to_evict).await - } - - /// Evict multiple layers at once, continuing through errors. - /// - /// The `remote_client` should be this timeline's `self.remote_client`. - /// We make the caller provide it so that they are responsible for handling the case - /// where someone wants to evict the layer but no remote storage is configured. - /// - /// Returns either `Err()` or `Ok(results)` where `results.len() == layers_to_evict.len()`. - /// If `Err()` is returned, no eviction was attempted. - /// Each position of `Ok(results)` corresponds to the layer in `layers_to_evict`. - /// Meaning of each `result[i]`: - /// - `Some(Err(...))` if layer replacement failed for some reason - /// - replacement failed for an expectable reason (e.g., layer removed by GC before we grabbed all locks) - /// - `Some(Ok(()))` if everything went well. - /// - `None` if no eviction attempt was made for the layer because `cancel.is_cancelled() == true`. - async fn evict_layer_batch( - &self, - remote_client: &Arc, - layers_to_evict: &[Layer], - ) -> anyhow::Result>>> { - { - // to avoid racing with detach and delete_timeline - let state = self.current_state(); - anyhow::ensure!( - state == TimelineState::Active, - "timeline is not active but {state:?}" - ); - } - - let mut results = Vec::with_capacity(layers_to_evict.len()); - for _ in 0..layers_to_evict.len() { - results.push(None); - } - - let mut js = tokio::task::JoinSet::new(); - - for (i, l) in layers_to_evict.iter().enumerate() { - js.spawn({ - let l = l.to_owned(); - let remote_client = remote_client.clone(); - async move { (i, l.evict_and_wait(&remote_client).await) } - }); - } - - let join = async { - while let Some(next) = js.join_next().await { - match next { - Ok((i, res)) => results[i] = Some(res), - Err(je) if je.is_cancelled() => unreachable!("not used"), - Err(je) if je.is_panic() => { /* already logged */ } - Err(je) => tracing::error!("unknown JoinError: {je:?}"), - } - } - }; - - tokio::select! { - _ = self.cancel.cancelled() => {}, - _ = join => {} - } - - assert_eq!(results.len(), layers_to_evict.len()); - Ok(results) - } } /// Number of times we will compute partition within a checkpoint distance. @@ -2165,6 +2062,55 @@ impl Timeline { None } + + /// The timeline heatmap is a hint to secondary locations from the primary location, + /// indicating which layers are currently on-disk on the primary. + /// + /// None is returned if the Timeline is in a state where uploading a heatmap + /// doesn't make sense, such as shutting down or initializing. The caller + /// should treat this as a cue to simply skip doing any heatmap uploading + /// for this timeline. + pub(crate) async fn generate_heatmap(&self) -> Option { + let eviction_info = self.get_local_layers_for_disk_usage_eviction().await; + + let remote_client = match &self.remote_client { + Some(c) => c, + None => return None, + }; + + let layer_file_names = eviction_info + .resident_layers + .iter() + .map(|l| l.layer.layer_desc().filename()) + .collect::>(); + + let decorated = match remote_client.get_layers_metadata(layer_file_names) { + Ok(d) => d, + Err(_) => { + // Getting metadata only fails on Timeline in bad state. + return None; + } + }; + + let heatmap_layers = std::iter::zip( + eviction_info.resident_layers.into_iter(), + decorated.into_iter(), + ) + .filter_map(|(layer, remote_info)| { + remote_info.map(|remote_info| { + HeatMapLayer::new( + layer.layer.layer_desc().filename(), + IndexLayerMetadata::from(remote_info), + layer.last_activity_ts, + ) + }) + }); + + Some(HeatMapTimeline::new( + self.timeline_id, + heatmap_layers.collect(), + )) + } } type TraversalId = String; @@ -4605,7 +4551,7 @@ mod tests { .await .unwrap(); - let rc = timeline + let rtc = timeline .remote_client .clone() .expect("just configured this"); @@ -4618,16 +4564,12 @@ mod tests { .expect("should had been resident") .drop_eviction_guard(); - let batch = [layer]; - - let first = async { timeline.evict_layer_batch(&rc, &batch).await.unwrap() }; - let second = async { timeline.evict_layer_batch(&rc, &batch).await.unwrap() }; + let first = async { layer.evict_and_wait(&rtc).await }; + let second = async { layer.evict_and_wait(&rtc).await }; let (first, second) = tokio::join!(first, second); - let (first, second) = (only_one(first), only_one(second)); - - let res = batch[0].keep_resident().await; + let res = layer.keep_resident().await; assert!(matches!(res, Ok(None)), "{res:?}"); match (first, second) { @@ -4648,14 +4590,6 @@ mod tests { RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error) } - fn only_one(mut input: Vec>) -> T { - assert_eq!(1, input.len()); - input - .pop() - .expect("length just checked") - .expect("no cancellation") - } - async fn find_some_layer(timeline: &Timeline) -> Layer { let layers = timeline.layers.read().await; let desc = layers diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 020c5a9e9f..782e8f9e39 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -212,11 +212,21 @@ impl Timeline { // Gather layers for eviction. // NB: all the checks can be invalidated as soon as we release the layer map lock. // We don't want to hold the layer map lock during eviction. + // So, we just need to deal with this. - let candidates: Vec<_> = { + + let remote_client = match self.remote_client.as_ref() { + Some(c) => c, + None => { + error!("no remote storage configured, cannot evict layers"); + return ControlFlow::Continue(()); + } + }; + + let mut js = tokio::task::JoinSet::new(); + { let guard = self.layers.read().await; let layers = guard.layer_map(); - let mut candidates = Vec::new(); for hist_layer in layers.iter_historic_layers() { let hist_layer = guard.get_from_desc(&hist_layer); @@ -262,54 +272,49 @@ impl Timeline { continue; } }; + let layer = guard.drop_eviction_guard(); if no_activity_for > p.threshold { - candidates.push(guard.drop_eviction_guard()) + let remote_client = remote_client.clone(); + // this could cause a lot of allocations in some cases + js.spawn(async move { layer.evict_and_wait(&remote_client).await }); + stats.candidates += 1; } } - candidates - }; - stats.candidates = candidates.len(); - - let remote_client = match self.remote_client.as_ref() { - None => { - error!( - num_candidates = candidates.len(), - "no remote storage configured, cannot evict layers" - ); - return ControlFlow::Continue(()); - } - Some(c) => c, }; - let results = match self.evict_layer_batch(remote_client, &candidates).await { - Err(pre_err) => { - stats.errors += candidates.len(); - error!("could not do any evictions: {pre_err:#}"); - return ControlFlow::Continue(()); + let join_all = async move { + while let Some(next) = js.join_next().await { + match next { + Ok(Ok(())) => stats.evicted += 1, + Ok(Err(EvictionError::NotFound | EvictionError::Downloaded)) => { + stats.not_evictable += 1; + } + Err(je) if je.is_cancelled() => unreachable!("not used"), + Err(je) if je.is_panic() => { + /* already logged */ + stats.errors += 1; + } + Err(je) => tracing::error!("unknown JoinError: {je:?}"), + } } - Ok(results) => results, + stats }; - assert_eq!(results.len(), candidates.len()); - for result in results { - match result { - None => { - stats.skipped_for_shutdown += 1; - } - Some(Ok(())) => { - stats.evicted += 1; - } - Some(Err(EvictionError::NotFound | EvictionError::Downloaded)) => { - stats.not_evictable += 1; + + tokio::select! { + stats = join_all => { + if stats.candidates == stats.not_evictable { + debug!(stats=?stats, "eviction iteration complete"); + } else if stats.errors > 0 || stats.not_evictable > 0 { + warn!(stats=?stats, "eviction iteration complete"); + } else { + info!(stats=?stats, "eviction iteration complete"); } } + _ = cancel.cancelled() => { + // just drop the joinset to "abort" + } } - if stats.candidates == stats.not_evictable { - debug!(stats=?stats, "eviction iteration complete"); - } else if stats.errors > 0 || stats.not_evictable > 0 { - warn!(stats=?stats, "eviction iteration complete"); - } else { - info!(stats=?stats, "eviction iteration complete"); - } + ControlFlow::Continue(()) } diff --git a/pageserver/src/tenant/timeline/uninit.rs b/pageserver/src/tenant/timeline/uninit.rs index 61130f541a..27d6fd9c28 100644 --- a/pageserver/src/tenant/timeline/uninit.rs +++ b/pageserver/src/tenant/timeline/uninit.rs @@ -19,14 +19,14 @@ use super::Timeline; pub struct UninitializedTimeline<'t> { pub(crate) owning_tenant: &'t Tenant, timeline_id: TimelineId, - raw_timeline: Option<(Arc, TimelineUninitMark)>, + raw_timeline: Option<(Arc, TimelineUninitMark<'t>)>, } impl<'t> UninitializedTimeline<'t> { pub(crate) fn new( owning_tenant: &'t Tenant, timeline_id: TimelineId, - raw_timeline: Option<(Arc, TimelineUninitMark)>, + raw_timeline: Option<(Arc, TimelineUninitMark<'t>)>, ) -> Self { Self { owning_tenant, @@ -169,18 +169,55 @@ pub(crate) fn cleanup_timeline_directory(uninit_mark: TimelineUninitMark) { /// /// XXX: it's important to create it near the timeline dir, not inside it to ensure timeline dir gets removed first. #[must_use] -pub(crate) struct TimelineUninitMark { +pub(crate) struct TimelineUninitMark<'t> { + owning_tenant: &'t Tenant, + timeline_id: TimelineId, uninit_mark_deleted: bool, uninit_mark_path: Utf8PathBuf, pub(crate) timeline_path: Utf8PathBuf, } -impl TimelineUninitMark { - pub(crate) fn new(uninit_mark_path: Utf8PathBuf, timeline_path: Utf8PathBuf) -> Self { - Self { - uninit_mark_deleted: false, - uninit_mark_path, - timeline_path, +/// Errors when acquiring exclusive access to a timeline ID for creation +#[derive(thiserror::Error, Debug)] +pub(crate) enum TimelineExclusionError { + #[error("Already exists")] + AlreadyExists(Arc), + #[error("Already creating")] + AlreadyCreating, + + // e.g. I/O errors, or some failure deep in postgres initdb + #[error(transparent)] + Other(#[from] anyhow::Error), +} + +impl<'t> TimelineUninitMark<'t> { + pub(crate) fn new( + owning_tenant: &'t Tenant, + timeline_id: TimelineId, + uninit_mark_path: Utf8PathBuf, + timeline_path: Utf8PathBuf, + ) -> Result { + // Lock order: this is the only place we take both locks. During drop() we only + // lock creating_timelines + let timelines = owning_tenant.timelines.lock().unwrap(); + let mut creating_timelines: std::sync::MutexGuard< + '_, + std::collections::HashSet, + > = owning_tenant.timelines_creating.lock().unwrap(); + + if let Some(existing) = timelines.get(&timeline_id) { + Err(TimelineExclusionError::AlreadyExists(existing.clone())) + } else if creating_timelines.contains(&timeline_id) { + Err(TimelineExclusionError::AlreadyCreating) + } else { + creating_timelines.insert(timeline_id); + Ok(Self { + owning_tenant, + timeline_id, + uninit_mark_deleted: false, + uninit_mark_path, + timeline_path, + }) } } @@ -207,7 +244,7 @@ impl TimelineUninitMark { } } -impl Drop for TimelineUninitMark { +impl Drop for TimelineUninitMark<'_> { fn drop(&mut self) { if !self.uninit_mark_deleted { if self.timeline_path.exists() { @@ -226,5 +263,11 @@ impl Drop for TimelineUninitMark { } } } + + self.owning_tenant + .timelines_creating + .lock() + .unwrap() + .remove(&self.timeline_id); } } diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 738216afa5..16b245c488 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -2191,7 +2191,7 @@ mod tests { .load() .await; let tline = tenant - .bootstrap_timeline(TIMELINE_ID, pg_version, None, &ctx) + .bootstrap_timeline_test(TIMELINE_ID, pg_version, None, &ctx) .await .unwrap(); diff --git a/proxy/src/bin/proxy.rs b/proxy/src/bin/proxy.rs index 1fa2d5599f..ae4c42bcb1 100644 --- a/proxy/src/bin/proxy.rs +++ b/proxy/src/bin/proxy.rs @@ -7,6 +7,8 @@ use proxy::console; use proxy::console::provider::AllowedIpsCache; use proxy::console::provider::NodeInfoCache; use proxy::http; +use proxy::rate_limiter::EndpointRateLimiter; +use proxy::rate_limiter::RateBucketInfo; use proxy::rate_limiter::RateLimiterConfig; use proxy::usage_metrics; @@ -14,6 +16,7 @@ use anyhow::bail; use proxy::config::{self, ProxyConfig}; use proxy::serverless; use std::pin::pin; +use std::sync::Arc; use std::{borrow::Cow, net::SocketAddr}; use tokio::net::TcpListener; use tokio::task::JoinSet; @@ -113,8 +116,11 @@ struct ProxyCliArgs { #[clap(long, default_value = "15s", value_parser = humantime::parse_duration)] rate_limiter_timeout: tokio::time::Duration, /// Endpoint rate limiter max number of requests per second. - #[clap(long, default_value_t = 300)] - endpoint_rps_limit: u32, + /// + /// Provided in the form '@'. + /// Can be given multiple times for different bucket sizes. + #[clap(long, default_values_t = RateBucketInfo::DEFAULT_SET)] + endpoint_rps_limit: Vec, /// Initial limit for dynamic rate limiter. Makes sense only if `rate_limit_algorithm` is *not* `None`. #[clap(long, default_value_t = 100)] initial_limit: usize, @@ -157,6 +163,8 @@ async fn main() -> anyhow::Result<()> { let proxy_listener = TcpListener::bind(proxy_address).await?; let cancellation_token = CancellationToken::new(); + let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new(&config.endpoint_rps_limit)); + // client facing tasks. these will exit on error or on cancellation // cancellation returns Ok(()) let mut client_tasks = JoinSet::new(); @@ -164,6 +172,7 @@ async fn main() -> anyhow::Result<()> { config, proxy_listener, cancellation_token.clone(), + endpoint_rate_limiter.clone(), )); // TODO: rename the argument to something like serverless. @@ -177,6 +186,7 @@ async fn main() -> anyhow::Result<()> { config, serverless_listener, cancellation_token.clone(), + endpoint_rate_limiter.clone(), )); } @@ -311,6 +321,10 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { let authentication_config = AuthenticationConfig { scram_protocol_timeout: args.scram_protocol_timeout, }; + + let mut endpoint_rps_limit = args.endpoint_rps_limit.clone(); + RateBucketInfo::validate(&mut endpoint_rps_limit)?; + let config = Box::leak(Box::new(ProxyConfig { tls_config, auth_backend, @@ -320,8 +334,35 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { authentication_config, require_client_ip: args.require_client_ip, disable_ip_check_for_http: args.disable_ip_check_for_http, - endpoint_rps_limit: args.endpoint_rps_limit, + endpoint_rps_limit, })); Ok(config) } + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use clap::Parser; + use proxy::rate_limiter::RateBucketInfo; + + #[test] + fn parse_endpoint_rps_limit() { + let config = super::ProxyCliArgs::parse_from([ + "proxy", + "--endpoint-rps-limit", + "100@1s", + "--endpoint-rps-limit", + "20@30s", + ]); + + assert_eq!( + config.endpoint_rps_limit, + vec![ + RateBucketInfo::new(100, Duration::from_secs(1)), + RateBucketInfo::new(20, Duration::from_secs(30)), + ] + ); + } +} diff --git a/proxy/src/compute.rs b/proxy/src/compute.rs index 78c56300a5..f5f7270bf4 100644 --- a/proxy/src/compute.rs +++ b/proxy/src/compute.rs @@ -1,9 +1,13 @@ use crate::{ - auth::parse_endpoint_param, cancellation::CancelClosure, console::errors::WakeComputeError, - error::UserFacingError, proxy::neon_option, + auth::parse_endpoint_param, + cancellation::CancelClosure, + console::errors::WakeComputeError, + error::UserFacingError, + proxy::{neon_option, NUM_DB_CONNECTIONS_GAUGE}, }; use futures::{FutureExt, TryFutureExt}; use itertools::Itertools; +use metrics::IntCounterPairGuard; use pq_proto::StartupMessageParams; use std::{io, net::SocketAddr, time::Duration}; use thiserror::Error; @@ -223,6 +227,8 @@ pub struct PostgresConnection { pub params: std::collections::HashMap, /// Query cancellation token. pub cancel_closure: CancelClosure, + + _guage: IntCounterPairGuard, } impl ConnCfg { @@ -231,6 +237,7 @@ impl ConnCfg { &self, allow_self_signed_compute: bool, timeout: Duration, + proto: &'static str, ) -> Result { let (socket_addr, stream, host) = self.connect_raw(timeout).await?; @@ -264,6 +271,7 @@ impl ConnCfg { stream, params, cancel_closure, + _guage: NUM_DB_CONNECTIONS_GAUGE.with_label_values(&[proto]).guard(), }; Ok(connection) diff --git a/proxy/src/config.rs b/proxy/src/config.rs index dea446eb22..f932df4058 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -1,4 +1,4 @@ -use crate::auth; +use crate::{auth, rate_limiter::RateBucketInfo}; use anyhow::{bail, ensure, Context, Ok}; use rustls::{sign, Certificate, PrivateKey}; use sha2::{Digest, Sha256}; @@ -20,7 +20,7 @@ pub struct ProxyConfig { pub authentication_config: AuthenticationConfig, pub require_client_ip: bool, pub disable_ip_check_for_http: bool, - pub endpoint_rps_limit: u32, + pub endpoint_rps_limit: Vec, } #[derive(Debug)] diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index ae8b294841..da65065179 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -17,7 +17,10 @@ use anyhow::{bail, Context}; use async_trait::async_trait; use futures::TryFutureExt; use itertools::Itertools; -use metrics::{exponential_buckets, register_int_counter_vec, IntCounterVec}; +use metrics::{ + exponential_buckets, register_int_counter_pair_vec, register_int_counter_vec, + IntCounterPairVec, IntCounterVec, +}; use once_cell::sync::{Lazy, OnceCell}; use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams}; use prometheus::{ @@ -44,17 +47,10 @@ const RETRY_WAIT_EXPONENT_BASE: f64 = std::f64::consts::SQRT_2; const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)"; const ERR_PROTO_VIOLATION: &str = "protocol violation"; -pub static NUM_DB_CONNECTIONS_OPENED_COUNTER: Lazy = Lazy::new(|| { - register_int_counter_vec!( +pub static NUM_DB_CONNECTIONS_GAUGE: Lazy = Lazy::new(|| { + register_int_counter_pair_vec!( "proxy_opened_db_connections_total", "Number of opened connections to a database.", - &["protocol"], - ) - .unwrap() -}); - -pub static NUM_DB_CONNECTIONS_CLOSED_COUNTER: Lazy = Lazy::new(|| { - register_int_counter_vec!( "proxy_closed_db_connections_total", "Number of closed connections to a database.", &["protocol"], @@ -62,17 +58,10 @@ pub static NUM_DB_CONNECTIONS_CLOSED_COUNTER: Lazy = Lazy::new(|| .unwrap() }); -pub static NUM_CLIENT_CONNECTION_OPENED_COUNTER: Lazy = Lazy::new(|| { - register_int_counter_vec!( +pub static NUM_CLIENT_CONNECTION_GAUGE: Lazy = Lazy::new(|| { + register_int_counter_pair_vec!( "proxy_opened_client_connections_total", "Number of opened connections from a client.", - &["protocol"], - ) - .unwrap() -}); - -pub static NUM_CLIENT_CONNECTION_CLOSED_COUNTER: Lazy = Lazy::new(|| { - register_int_counter_vec!( "proxy_closed_client_connections_total", "Number of closed connections from a client.", &["protocol"], @@ -80,17 +69,10 @@ pub static NUM_CLIENT_CONNECTION_CLOSED_COUNTER: Lazy = Lazy::new .unwrap() }); -pub static NUM_CONNECTIONS_ACCEPTED_COUNTER: Lazy = Lazy::new(|| { - register_int_counter_vec!( +pub static NUM_CONNECTION_REQUESTS_GAUGE: Lazy = Lazy::new(|| { + register_int_counter_pair_vec!( "proxy_accepted_connections_total", "Number of client connections accepted.", - &["protocol"], - ) - .unwrap() -}); - -pub static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy = Lazy::new(|| { - register_int_counter_vec!( "proxy_closed_connections_total", "Number of client connections closed.", &["protocol"], @@ -297,6 +279,7 @@ pub async fn task_main( config: &'static ProxyConfig, listener: tokio::net::TcpListener, cancellation_token: CancellationToken, + endpoint_rate_limiter: Arc, ) -> anyhow::Result<()> { scopeguard::defer! { info!("proxy has shut down"); @@ -308,7 +291,6 @@ pub async fn task_main( let connections = tokio_util::task::task_tracker::TaskTracker::new(); let cancel_map = Arc::new(CancelMap::default()); - let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new(config.endpoint_rps_limit)); while let Some(accept_result) = run_until_cancelled(listener.accept(), &cancellation_token).await @@ -428,16 +410,12 @@ pub async fn handle_client( ); let proto = mode.protocol_label(); - NUM_CLIENT_CONNECTION_OPENED_COUNTER + let _client_gauge = NUM_CLIENT_CONNECTION_GAUGE .with_label_values(&[proto]) - .inc(); - NUM_CONNECTIONS_ACCEPTED_COUNTER + .guard(); + let _request_gauge = NUM_CONNECTION_REQUESTS_GAUGE .with_label_values(&[proto]) - .inc(); - scopeguard::defer! { - NUM_CLIENT_CONNECTION_CLOSED_COUNTER.with_label_values(&[proto]).inc(); - NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[proto]).inc(); - } + .guard(); let tls = config.tls_config.as_ref(); @@ -584,12 +562,13 @@ pub fn invalidate_cache(node_info: console::CachedNodeInfo) -> compute::ConnCfg async fn connect_to_compute_once( node_info: &console::CachedNodeInfo, timeout: time::Duration, + proto: &'static str, ) -> Result { let allow_self_signed_compute = node_info.allow_self_signed_compute; node_info .config - .connect(allow_self_signed_compute, timeout) + .connect(allow_self_signed_compute, timeout, proto) .await } @@ -610,6 +589,7 @@ pub trait ConnectMechanism { pub struct TcpMechanism<'a> { /// KV-dictionary with PostgreSQL connection params. pub params: &'a StartupMessageParams, + pub proto: &'static str, } #[async_trait] @@ -623,7 +603,7 @@ impl ConnectMechanism for TcpMechanism<'_> { node_info: &console::CachedNodeInfo, timeout: time::Duration, ) -> Result { - connect_to_compute_once(node_info, timeout).await + connect_to_compute_once(node_info, timeout, self.proto).await } fn update_connect_config(&self, config: &mut compute::ConnCfg) { @@ -1028,7 +1008,7 @@ impl Client<'_, S> { let aux = node_info.aux.clone(); let mut node = connect_to_compute( - &TcpMechanism { params }, + &TcpMechanism { params, proto }, node_info, &extra, &creds, @@ -1037,13 +1017,6 @@ impl Client<'_, S> { .or_else(|e| stream.throw_error(e)) .await?; - NUM_DB_CONNECTIONS_OPENED_COUNTER - .with_label_values(&[proto]) - .inc(); - scopeguard::defer! { - NUM_DB_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[proto]).inc(); - } - prepare_client_connection(&node, session, &mut stream).await?; // Before proxy passing, forward to compute whatever data is left in the // PqStream input buffer. Normally there is none, but our serverless npm diff --git a/proxy/src/rate_limiter.rs b/proxy/src/rate_limiter.rs index f40b8dbd1c..b26386d159 100644 --- a/proxy/src/rate_limiter.rs +++ b/proxy/src/rate_limiter.rs @@ -3,5 +3,5 @@ mod limit_algorithm; mod limiter; pub use aimd::Aimd; pub use limit_algorithm::{AimdConfig, Fixed, RateLimitAlgorithm, RateLimiterConfig}; -pub use limiter::EndpointRateLimiter; pub use limiter::Limiter; +pub use limiter::{EndpointRateLimiter, RateBucketInfo}; diff --git a/proxy/src/rate_limiter/limiter.rs b/proxy/src/rate_limiter/limiter.rs index 9d28bb67b3..87c1597ca9 100644 --- a/proxy/src/rate_limiter/limiter.rs +++ b/proxy/src/rate_limiter/limiter.rs @@ -1,16 +1,15 @@ -use std::{ - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, - time::Duration, +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, }; +use anyhow::bail; use dashmap::DashMap; -use parking_lot::Mutex; +use itertools::Itertools; +use rand::{thread_rng, Rng}; use smol_str::SmolStr; use tokio::sync::{Mutex as AsyncMutex, Semaphore, SemaphorePermit}; -use tokio::time::{timeout, Instant}; +use tokio::time::{timeout, Duration, Instant}; use tracing::info; use super::{ @@ -29,60 +28,156 @@ use super::{ // saw SNI, before doing TLS handshake. User-side error messages in that case // does not look very nice (`SSL SYSCALL error: Undefined error: 0`), so for now // I went with a more expensive way that yields user-friendlier error messages. -// -// TODO: add a better bucketing here, e.g. not more than 300 requests per second, -// and not more than 1000 requests per 10 seconds, etc. Short bursts of reconnects -// are noramal during redeployments, so we should not block them. pub struct EndpointRateLimiter { - map: DashMap>>, - max_rps: u32, + map: DashMap>, + info: &'static [RateBucketInfo], access_count: AtomicUsize, } -impl EndpointRateLimiter { - pub fn new(max_rps: u32) -> Self { +#[derive(Clone, Copy)] +struct RateBucket { + start: Instant, + count: u32, +} + +impl RateBucket { + fn should_allow_request(&mut self, info: &RateBucketInfo, now: Instant) -> bool { + if now - self.start < info.interval { + self.count < info.max_rpi + } else { + // bucket expired, reset + self.count = 0; + self.start = now; + + true + } + } + + fn inc(&mut self) { + self.count += 1; + } +} + +#[derive(Clone, Copy, PartialEq)] +pub struct RateBucketInfo { + pub interval: Duration, + // requests per interval + pub max_rpi: u32, +} + +impl std::fmt::Display for RateBucketInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let rps = self.max_rpi * 1000 / self.interval.as_millis() as u32; + write!(f, "{rps}@{}", humantime::format_duration(self.interval)) + } +} + +impl std::fmt::Debug for RateBucketInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{self}") + } +} + +impl std::str::FromStr for RateBucketInfo { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + let Some((max_rps, interval)) = s.split_once('@') else { + bail!("invalid rate info") + }; + let max_rps = max_rps.parse()?; + let interval = humantime::parse_duration(interval)?; + Ok(Self::new(max_rps, interval)) + } +} + +impl RateBucketInfo { + pub const DEFAULT_SET: [Self; 3] = [ + Self::new(300, Duration::from_secs(1)), + Self::new(200, Duration::from_secs(60)), + Self::new(100, Duration::from_secs(600)), + ]; + + pub fn validate(info: &mut [Self]) -> anyhow::Result<()> { + info.sort_unstable_by_key(|info| info.interval); + let invalid = info + .iter() + .tuple_windows() + .find(|(a, b)| a.max_rpi > b.max_rpi); + if let Some((a, b)) = invalid { + bail!( + "invalid endpoint RPS limits. {b} allows fewer requests per bucket than {a} ({} vs {})", + b.max_rpi, + a.max_rpi, + ); + } + + Ok(()) + } + + pub const fn new(max_rps: u32, interval: Duration) -> Self { Self { - map: DashMap::new(), - max_rps, + interval, + max_rpi: max_rps * interval.as_millis() as u32 / 1000, + } + } +} + +impl EndpointRateLimiter { + pub fn new(info: &'static [RateBucketInfo]) -> Self { + info!(buckets = ?info, "endpoint rate limiter"); + Self { + info, + map: DashMap::with_shard_amount(64), access_count: AtomicUsize::new(1), // start from 1 to avoid GC on the first request } } /// Check that number of connections to the endpoint is below `max_rps` rps. pub fn check(&self, endpoint: SmolStr) -> bool { - // do GC every 100k requests (worst case memory usage is about 10MB) - if self.access_count.fetch_add(1, Ordering::AcqRel) % 100_000 == 0 { + // do a partial GC every 2k requests. This cleans up ~ 1/64th of the map. + // worst case memory usage is about: + // = 2 * 2048 * 64 * (48B + 72B) + // = 30MB + if self.access_count.fetch_add(1, Ordering::AcqRel) % 2048 == 0 { self.do_gc(); } - let now = chrono::Utc::now().naive_utc().time(); - let entry = self - .map - .entry(endpoint) - .or_insert_with(|| Arc::new(Mutex::new((now, 0)))); - let mut entry = entry.lock(); - let (last_time, count) = *entry; + let now = Instant::now(); + let mut entry = self.map.entry(endpoint).or_insert_with(|| { + vec![ + RateBucket { + start: now, + count: 0, + }; + self.info.len() + ] + }); - if now - last_time < chrono::Duration::seconds(1) { - if count >= self.max_rps { - return false; - } - *entry = (last_time, count + 1); - } else { - *entry = (now, 1); + let should_allow_request = entry + .iter_mut() + .zip(self.info) + .all(|(bucket, info)| bucket.should_allow_request(info, now)); + + if should_allow_request { + // only increment the bucket counts if the request will actually be accepted + entry.iter_mut().for_each(RateBucket::inc); } - true + + should_allow_request } - /// Clean the map. Simple strategy: remove all entries. At worst, we'll - /// double the effective max_rps during the cleanup. But that way deletion - /// does not aquire mutex on each entry access. + /// Clean the map. Simple strategy: remove all entries in a random shard. + /// At worst, we'll double the effective max_rps during the cleanup. + /// But that way deletion does not aquire mutex on each entry access. pub fn do_gc(&self) { info!( "cleaning up endpoint rate limiter, current size = {}", self.map.len() ); - self.map.clear(); + let n = self.map.shards().len(); + let shard = thread_rng().gen_range(0..n); + self.map.shards()[shard].write().clear(); } } @@ -398,9 +493,11 @@ mod tests { use std::{pin::pin, task::Context, time::Duration}; use futures::{task::noop_waker_ref, Future}; + use smol_str::SmolStr; + use tokio::time; - use super::{Limiter, Outcome}; - use crate::rate_limiter::RateLimitAlgorithm; + use super::{EndpointRateLimiter, Limiter, Outcome}; + use crate::rate_limiter::{RateBucketInfo, RateLimitAlgorithm}; #[tokio::test] async fn it_works() { @@ -509,4 +606,88 @@ mod tests { limiter.release(token1, None).await; limiter.release(token2, None).await; } + + #[test] + fn rate_bucket_rpi() { + let rate_bucket = RateBucketInfo::new(50, Duration::from_secs(5)); + assert_eq!(rate_bucket.max_rpi, 50 * 5); + + let rate_bucket = RateBucketInfo::new(50, Duration::from_millis(500)); + assert_eq!(rate_bucket.max_rpi, 50 / 2); + } + + #[test] + fn rate_bucket_parse() { + let rate_bucket: RateBucketInfo = "100@10s".parse().unwrap(); + assert_eq!(rate_bucket.interval, Duration::from_secs(10)); + assert_eq!(rate_bucket.max_rpi, 100 * 10); + assert_eq!(rate_bucket.to_string(), "100@10s"); + + let rate_bucket: RateBucketInfo = "100@1m".parse().unwrap(); + assert_eq!(rate_bucket.interval, Duration::from_secs(60)); + assert_eq!(rate_bucket.max_rpi, 100 * 60); + assert_eq!(rate_bucket.to_string(), "100@1m"); + } + + #[test] + fn default_rate_buckets() { + let mut defaults = RateBucketInfo::DEFAULT_SET; + RateBucketInfo::validate(&mut defaults[..]).unwrap(); + } + + #[test] + #[should_panic = "invalid endpoint RPS limits. 10@10s allows fewer requests per bucket than 300@1s (100 vs 300)"] + fn rate_buckets_validate() { + let mut rates: Vec = ["300@1s", "10@10s"] + .into_iter() + .map(|s| s.parse().unwrap()) + .collect(); + RateBucketInfo::validate(&mut rates).unwrap(); + } + + #[tokio::test] + async fn test_rate_limits() { + let mut rates: Vec = ["100@1s", "20@30s"] + .into_iter() + .map(|s| s.parse().unwrap()) + .collect(); + RateBucketInfo::validate(&mut rates).unwrap(); + let limiter = EndpointRateLimiter::new(Vec::leak(rates)); + + let endpoint = SmolStr::from("ep-my-endpoint-1234"); + + time::pause(); + + for _ in 0..100 { + assert!(limiter.check(endpoint.clone())); + } + // more connections fail + assert!(!limiter.check(endpoint.clone())); + + // fail even after 500ms as it's in the same bucket + time::advance(time::Duration::from_millis(500)).await; + assert!(!limiter.check(endpoint.clone())); + + // after a full 1s, 100 requests are allowed again + time::advance(time::Duration::from_millis(500)).await; + for _ in 1..6 { + for _ in 0..100 { + assert!(limiter.check(endpoint.clone())); + } + time::advance(time::Duration::from_millis(1000)).await; + } + + // more connections after 600 will exceed the 20rps@30s limit + assert!(!limiter.check(endpoint.clone())); + + // will still fail before the 30 second limit + time::advance(time::Duration::from_millis(30_000 - 6_000 - 1)).await; + assert!(!limiter.check(endpoint.clone())); + + // after the full 30 seconds, 100 requests are allowed again + time::advance(time::Duration::from_millis(1)).await; + for _ in 0..100 { + assert!(limiter.check(endpoint.clone())); + } + } } diff --git a/proxy/src/serverless.rs b/proxy/src/serverless.rs index 92d6e2d851..870e9c1103 100644 --- a/proxy/src/serverless.rs +++ b/proxy/src/serverless.rs @@ -8,12 +8,13 @@ mod websocket; use anyhow::bail; use hyper::StatusCode; +use metrics::IntCounterPairGuard; pub use reqwest_middleware::{ClientWithMiddleware, Error}; pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; use tokio_util::task::TaskTracker; use crate::protocol2::{ProxyProtocolAccept, WithClientIp}; -use crate::proxy::{NUM_CLIENT_CONNECTION_CLOSED_COUNTER, NUM_CLIENT_CONNECTION_OPENED_COUNTER}; +use crate::proxy::NUM_CLIENT_CONNECTION_GAUGE; use crate::rate_limiter::EndpointRateLimiter; use crate::{cancellation::CancelMap, config::ProxyConfig}; use futures::StreamExt; @@ -38,13 +39,13 @@ pub async fn task_main( config: &'static ProxyConfig, ws_listener: TcpListener, cancellation_token: CancellationToken, + endpoint_rate_limiter: Arc, ) -> anyhow::Result<()> { scopeguard::defer! { info!("websocket server has shut down"); } let conn_pool = conn_pool::GlobalConnPool::new(config); - let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new(config.endpoint_rps_limit)); // shutdown the connection pool tokio::spawn({ @@ -149,22 +150,17 @@ pub async fn task_main( struct MetricService { inner: S, + _gauge: IntCounterPairGuard, } impl MetricService { fn new(inner: S) -> MetricService { - NUM_CLIENT_CONNECTION_OPENED_COUNTER - .with_label_values(&["http"]) - .inc(); - MetricService { inner } - } -} - -impl Drop for MetricService { - fn drop(&mut self) { - NUM_CLIENT_CONNECTION_CLOSED_COUNTER - .with_label_values(&["http"]) - .inc(); + MetricService { + inner, + _gauge: NUM_CLIENT_CONNECTION_GAUGE + .with_label_values(&["http"]) + .guard(), + } } } @@ -248,7 +244,7 @@ async fn request_handler( .header("Access-Control-Allow-Origin", "*") .header( "Access-Control-Allow-Headers", - "Neon-Connection-String, Neon-Raw-Text-Output, Neon-Array-Mode, Neon-Pool-Opt-In", + "Neon-Connection-String, Neon-Raw-Text-Output, Neon-Array-Mode, Neon-Pool-Opt-In, Neon-Batch-Read-Only, Neon-Batch-Isolation-Level", ) .header("Access-Control-Max-Age", "86400" /* 24 hours */) .status(StatusCode::OK) // 204 is also valid, but see: https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods/OPTIONS#status_code diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index 4f3b31b9be..69198d79d3 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -24,10 +24,7 @@ use tokio_postgres::{AsyncMessage, ReadyForQueryStatus}; use crate::{ auth::{self, backend::ComputeUserInfo, check_peer_addr_is_in_list}, console, - proxy::{ - neon_options, LatencyTimer, NUM_DB_CONNECTIONS_CLOSED_COUNTER, - NUM_DB_CONNECTIONS_OPENED_COUNTER, - }, + proxy::{neon_options, LatencyTimer, NUM_DB_CONNECTIONS_GAUGE}, usage_metrics::{Ids, MetricCounter, USAGE_METRICS}, }; use crate::{compute, config}; @@ -477,6 +474,11 @@ async fn connect_to_compute_once( .connect_timeout(timeout) .connect(tokio_postgres::NoTls) .await?; + + let conn_gauge = NUM_DB_CONNECTIONS_GAUGE + .with_label_values(&["http"]) + .guard(); + tracing::Span::current().record("pid", &tracing::field::display(client.get_process_id())); let (tx, mut rx) = tokio::sync::watch::channel(session); @@ -492,10 +494,7 @@ async fn connect_to_compute_once( tokio::spawn( async move { - NUM_DB_CONNECTIONS_OPENED_COUNTER.with_label_values(&["http"]).inc(); - scopeguard::defer! { - NUM_DB_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc(); - } + let _conn_gauge = conn_gauge; poll_fn(move |cx| { if matches!(rx.has_changed(), Ok(true)) { session = *rx.borrow_and_update(); diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index 6e80260193..795ba819c1 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -29,7 +29,7 @@ use utils::http::error::ApiError; use utils::http::json::json_response; use crate::config::HttpConfig; -use crate::proxy::{NUM_CONNECTIONS_ACCEPTED_COUNTER, NUM_CONNECTIONS_CLOSED_COUNTER}; +use crate::proxy::NUM_CONNECTION_REQUESTS_GAUGE; use super::conn_pool::ConnInfo; use super::conn_pool::GlobalConnPool; @@ -303,12 +303,9 @@ async fn handle_inner( session_id: uuid::Uuid, peer_addr: IpAddr, ) -> anyhow::Result> { - NUM_CONNECTIONS_ACCEPTED_COUNTER + let _request_gauge = NUM_CONNECTION_REQUESTS_GAUGE .with_label_values(&["http"]) - .inc(); - scopeguard::defer! { - NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc(); - } + .guard(); // // Determine the destination and connection params diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index d5333abae6..761541168c 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -11,7 +11,7 @@ use tracing::{debug, info, info_span, Instrument}; use crate::auth::check_permission; use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage}; -use crate::metrics::{TrafficMetrics, PG_QUERIES_FINISHED, PG_QUERIES_RECEIVED}; +use crate::metrics::{TrafficMetrics, PG_QUERIES_GAUGE}; use crate::safekeeper::Term; use crate::timeline::TimelineError; use crate::wal_service::ConnectionId; @@ -210,10 +210,7 @@ impl postgres_backend::Handler let cmd = parse_cmd(query_string)?; let cmd_str = cmd_to_string(&cmd); - PG_QUERIES_RECEIVED.with_label_values(&[cmd_str]).inc(); - scopeguard::defer! { - PG_QUERIES_FINISHED.with_label_values(&[cmd_str]).inc(); - } + let _guard = PG_QUERIES_GAUGE.with_label_values(&[cmd_str]).guard(); info!("got query {:?}", query_string); diff --git a/safekeeper/src/metrics.rs b/safekeeper/src/metrics.rs index 0711beb290..11a3f48922 100644 --- a/safekeeper/src/metrics.rs +++ b/safekeeper/src/metrics.rs @@ -11,7 +11,8 @@ use futures::Future; use metrics::{ core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts}, proto::MetricFamily, - register_int_counter, register_int_counter_vec, Gauge, IntCounter, IntCounterVec, IntGaugeVec, + register_int_counter, register_int_counter_pair_vec, register_int_counter_vec, Gauge, + IntCounter, IntCounterPairVec, IntCounterVec, IntGaugeVec, }; use once_cell::sync::Lazy; @@ -89,16 +90,10 @@ pub static BROKER_PULLED_UPDATES: Lazy = Lazy::new(|| { ) .expect("Failed to register safekeeper_broker_pulled_updates_total counter") }); -pub static PG_QUERIES_RECEIVED: Lazy = Lazy::new(|| { - register_int_counter_vec!( +pub static PG_QUERIES_GAUGE: Lazy = Lazy::new(|| { + register_int_counter_pair_vec!( "safekeeper_pg_queries_received_total", "Number of queries received through pg protocol", - &["query"] - ) - .expect("Failed to register safekeeper_pg_queries_received_total counter") -}); -pub static PG_QUERIES_FINISHED: Lazy = Lazy::new(|| { - register_int_counter_vec!( "safekeeper_pg_queries_finished_total", "Number of queries finished through pg protocol", &["query"] diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 4b23650960..7dfdd9274d 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1870,11 +1870,12 @@ class NeonPageserver(PgProtocol): tenant_id: TenantId, conf: Optional[Dict[str, Any]] = None, auth_token: Optional[str] = None, + generation: Optional[int] = None, ) -> TenantId: + if generation is None: + generation = self.maybe_get_generation(tenant_id) client = self.http_client(auth_token=auth_token) - return client.tenant_create( - tenant_id, conf, generation=self.maybe_get_generation(tenant_id) - ) + return client.tenant_create(tenant_id, conf, generation=generation) def tenant_load(self, tenant_id: TenantId): client = self.http_client() diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index b46ddf5527..eda8813c36 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -322,6 +322,10 @@ class PageserverHttpClient(requests.Session): self.verbose_error(res) return TenantConfig.from_json(res.json()) + def tenant_heatmap_upload(self, tenant_id: TenantId): + res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/heatmap_upload") + self.verbose_error(res) + def set_tenant_config(self, tenant_id: TenantId, config: dict[str, Any]): assert "tenant_id" not in config.keys() res = self.put( diff --git a/test_runner/fixtures/remote_storage.py b/test_runner/fixtures/remote_storage.py index 824531bea4..c0c2383feb 100644 --- a/test_runner/fixtures/remote_storage.py +++ b/test_runner/fixtures/remote_storage.py @@ -16,6 +16,7 @@ from fixtures.log_helper import log from fixtures.types import TenantId, TimelineId TIMELINE_INDEX_PART_FILE_NAME = "index_part.json" +TENANT_HEATMAP_FILE_NAME = "heatmap-v1.json" @enum.unique @@ -133,6 +134,13 @@ class LocalFsStorage: with self.index_path(tenant_id, timeline_id).open("r") as f: return json.load(f) + def heatmap_path(self, tenant_id: TenantId) -> Path: + return self.tenant_path(tenant_id) / TENANT_HEATMAP_FILE_NAME + + def heatmap_content(self, tenant_id): + with self.heatmap_path(tenant_id).open("r") as f: + return json.load(f) + def to_toml_inline_table(self) -> str: rv = { "local_path": str(self.root), diff --git a/test_runner/performance/test_bulk_insert.py b/test_runner/performance/test_bulk_insert.py index a146e011cc..a2a1fa11e5 100644 --- a/test_runner/performance/test_bulk_insert.py +++ b/test_runner/performance/test_bulk_insert.py @@ -55,9 +55,20 @@ def measure_recovery_time(env: NeonCompare): # Delete the Tenant in the pageserver: this will drop local and remote layers, such that # when we "create" the Tenant again, we will replay the WAL from the beginning. + # + # This is a "weird" thing to do, and can confuse the attachment service as we're re-using + # the same tenant ID for a tenant that is logically different from the pageserver's point + # of view, but the same as far as the safekeeper/WAL is concerned. To work around that, + # we will explicitly create the tenant in the same generation that it was previously + # attached in. + assert env.env.attachment_service is not None + attach_status = env.env.attachment_service.inspect(tenant_id=env.tenant) + assert attach_status is not None + (attach_gen, _) = attach_status + client.tenant_delete(env.tenant) wait_tenant_status_404(client, env.tenant, iterations=60, interval=0.5) - env.env.pageserver.tenant_create(tenant_id=env.tenant) + env.env.pageserver.tenant_create(tenant_id=env.tenant, generation=attach_gen) # Measure recovery time with env.record_duration("wal_recovery"): diff --git a/test_runner/regress/test_attach_tenant_config.py b/test_runner/regress/test_attach_tenant_config.py index 70d386a566..d2bd53d8aa 100644 --- a/test_runner/regress/test_attach_tenant_config.py +++ b/test_runner/regress/test_attach_tenant_config.py @@ -163,6 +163,7 @@ def test_fully_custom_config(positive_env: NeonEnv): "gc_feedback": True, "gc_horizon": 23 * (1024 * 1024), "gc_period": "2h 13m", + "heatmap_period": "10m", "image_creation_threshold": 7, "pitr_interval": "1m", "lagging_wal_timeout": "23m", diff --git a/test_runner/regress/test_branching.py b/test_runner/regress/test_branching.py index 82ca985d01..9a0b91b54e 100644 --- a/test_runner/regress/test_branching.py +++ b/test_runner/regress/test_branching.py @@ -1,8 +1,7 @@ import random import threading import time -from queue import SimpleQueue -from typing import Any, Dict, List, Union +from typing import List import pytest from fixtures.log_helper import log @@ -239,92 +238,6 @@ def test_cannot_branch_from_non_uploaded_branch(neon_env_builder: NeonEnvBuilder t.join() -def test_competing_branchings_from_loading_race_to_ok_or_err(neon_env_builder: NeonEnvBuilder): - """ - If the activate only after upload is used, then retries could become competing. - """ - - env = neon_env_builder.init_configs() - env.start() - - env.pageserver.allowed_errors.extend( - [ - ".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*", - ".*Error processing HTTP request: InternalServerError\\(Timeline .*/.* already exists in pageserver's memory", - ] - ) - ps_http = env.pageserver.http_client() - - # pause all uploads - ps_http.configure_failpoints(("before-upload-index-pausable", "pause")) - env.pageserver.tenant_create(env.initial_tenant) - - def start_creating_timeline(): - ps_http.timeline_create( - env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60 - ) - - create_root = threading.Thread(target=start_creating_timeline) - - branch_id = TimelineId.generate() - - queue: SimpleQueue[Union[Dict[Any, Any], Exception]] = SimpleQueue() - barrier = threading.Barrier(3) - - def try_branch(): - barrier.wait() - barrier.wait() - try: - ret = ps_http.timeline_create( - env.pg_version, - env.initial_tenant, - branch_id, - ancestor_timeline_id=env.initial_timeline, - timeout=5, - ) - queue.put(ret) - except Exception as e: - queue.put(e) - - threads = [threading.Thread(target=try_branch) for _ in range(2)] - - try: - create_root.start() - - for t in threads: - t.start() - - wait_until_paused(env, "before-upload-index-pausable") - - barrier.wait() - ps_http.configure_failpoints(("before-upload-index-pausable", "off")) - barrier.wait() - - # now both requests race to branch, only one can win because they take gc_cs, Tenant::timelines or marker files - first = queue.get() - second = queue.get() - - log.info(first) - log.info(second) - - (succeeded, failed) = (first, second) if isinstance(second, Exception) else (second, first) - assert isinstance(failed, Exception) - assert isinstance(succeeded, Dict) - - # there's multiple valid status codes: - # - Timeline x/y already exists - # - whatever 409 response says, but that is a subclass of PageserverApiException - assert isinstance(failed, PageserverApiException) - assert succeeded["state"] == "Active" - finally: - # we might still have the failpoint active - env.pageserver.stop(immediate=True) - - for t in threads: - t.join() - create_root.join() - - def test_non_uploaded_root_timeline_is_deleted_after_restart(neon_env_builder: NeonEnvBuilder): """ Check that a timeline is deleted locally on subsequent restart if it never successfully uploaded during creation. diff --git a/test_runner/regress/test_compatibility.py b/test_runner/regress/test_compatibility.py index 3f5de100fd..5a9c2782e6 100644 --- a/test_runner/regress/test_compatibility.py +++ b/test_runner/regress/test_compatibility.py @@ -273,9 +273,24 @@ def check_neon_works(env: NeonEnv, test_output_dir: Path, sql_dump_path: Path, r timeline_id = env.initial_timeline pg_version = env.pg_version - shutil.rmtree(repo_dir / "local_fs_remote_storage") + # Delete all files from local_fs_remote_storage except initdb.tar.zst, + # the file is required for `timeline_create` with `existing_initdb_timeline_id`. + # + # TODO: switch to Path.walk() in Python 3.12 + # for dirpath, _dirnames, filenames in (repo_dir / "local_fs_remote_storage").walk(): + for dirpath, _dirnames, filenames in os.walk(repo_dir / "local_fs_remote_storage"): + for filename in filenames: + if filename != "initdb.tar.zst": + (Path(dirpath) / filename).unlink() + timeline_delete_wait_completed(pageserver_http, tenant_id, timeline_id) - pageserver_http.timeline_create(pg_version, tenant_id, timeline_id) + pageserver_http.timeline_create( + pg_version=pg_version, + tenant_id=tenant_id, + new_timeline_id=timeline_id, + existing_initdb_timeline_id=timeline_id, + ) + pg_bin.run_capture( ["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump-from-wal.sql'}"] ) diff --git a/test_runner/regress/test_pageserver_secondary.py b/test_runner/regress/test_pageserver_secondary.py index b14b7f1328..64ade346aa 100644 --- a/test_runner/regress/test_pageserver_secondary.py +++ b/test_runner/regress/test_pageserver_secondary.py @@ -4,7 +4,7 @@ from typing import Any, Dict, Optional import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserver -from fixtures.remote_storage import RemoteStorageKind +from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind from fixtures.types import TenantId, TimelineId from fixtures.utils import wait_until from fixtures.workload import Workload @@ -330,3 +330,46 @@ def test_live_migration(neon_env_builder: NeonEnvBuilder): workload.churn_rows(64, pageserver_b.id) workload.validate(pageserver_b.id) + + +def test_heatmap_uploads(neon_env_builder: NeonEnvBuilder): + """ + Test the sequence of location states that are used in a live migration. + """ + env = neon_env_builder.init_start() # initial_tenant_conf=TENANT_CONF) + assert isinstance(env.pageserver_remote_storage, LocalFsStorage) + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + # Write some data so that we have some layers + workload = Workload(env, tenant_id, timeline_id) + workload.init(env.pageservers[0].id) + + # Write some layers and upload a heatmap + workload.write_rows(256, env.pageservers[0].id) + env.pageserver.http_client().tenant_heatmap_upload(tenant_id) + + def validate_heatmap(heatmap): + assert len(heatmap["timelines"]) == 1 + assert heatmap["timelines"][0]["timeline_id"] == str(timeline_id) + assert len(heatmap["timelines"][0]["layers"]) > 0 + layers = heatmap["timelines"][0]["layers"] + + # Each layer appears at most once + assert len(set(layer["name"] for layer in layers)) == len(layers) + + # Download and inspect the heatmap that the pageserver uploaded + heatmap_first = env.pageserver_remote_storage.heatmap_content(tenant_id) + log.info(f"Read back heatmap: {heatmap_first}") + validate_heatmap(heatmap_first) + + # Do some more I/O to generate more layers + workload.churn_rows(64, env.pageservers[0].id) + env.pageserver.http_client().tenant_heatmap_upload(tenant_id) + + # Ensure that another heatmap upload includes the new layers + heatmap_second = env.pageserver_remote_storage.heatmap_content(tenant_id) + log.info(f"Read back heatmap: {heatmap_second}") + assert heatmap_second != heatmap_first + validate_heatmap(heatmap_second) diff --git a/test_runner/regress/test_wal_restore.py b/test_runner/regress/test_wal_restore.py index 4a9ffeee4b..7d03f644d1 100644 --- a/test_runner/regress/test_wal_restore.py +++ b/test_runner/regress/test_wal_restore.py @@ -1,7 +1,6 @@ import sys import tarfile import tempfile -import time from pathlib import Path import pytest @@ -12,6 +11,7 @@ from fixtures.neon_fixtures import ( PgBin, VanillaPostgres, ) +from fixtures.pageserver.utils import timeline_delete_wait_completed from fixtures.port_distributor import PortDistributor from fixtures.remote_storage import LocalFsStorage from fixtures.types import Lsn, TenantId, TimelineId @@ -128,10 +128,7 @@ def test_wal_restore_initdb( assert restored.safe_psql("select count(*) from t", user="cloud_admin") == [(300000,)] -def test_wal_restore_http( - neon_env_builder: NeonEnvBuilder, - test_output_dir: Path, -): +def test_wal_restore_http(neon_env_builder: NeonEnvBuilder): env = neon_env_builder.init_start() endpoint = env.endpoints.create_start("main") endpoint.safe_psql("create table t as select generate_series(1,300000)") @@ -145,15 +142,7 @@ def test_wal_restore_http( assert isinstance(env.pageserver_remote_storage, LocalFsStorage) - test_output_dir / "initdb.tar.zst" - - (env.pageserver_remote_storage.timeline_path(tenant_id, timeline_id) / "initdb.tar.zst") - - ps_client.timeline_delete(tenant_id, timeline_id) - time.sleep(2) - - # verify that it is indeed deleted - # TODO + timeline_delete_wait_completed(ps_client, tenant_id, timeline_id) # issue the restoration command ps_client.timeline_create( diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index dd067cf656..0bb356aa0c 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit dd067cf656f6810a25aca6025633d32d02c5085a +Subproject commit 0bb356aa0cd1582112926fbcf0b5370222c2db6d diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index bc88f53931..24333abb81 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit bc88f539312fcc4bb292ce94ae9db09ab6656e8a +Subproject commit 24333abb81a9ecae4541019478f0bf7d0b289df7 diff --git a/vendor/postgres-v16 b/vendor/postgres-v16 index e3a22b7292..863b71572b 160000 --- a/vendor/postgres-v16 +++ b/vendor/postgres-v16 @@ -1 +1 @@ -Subproject commit e3a22b72922055f9212eca12700190f118578362 +Subproject commit 863b71572bc441581efb3bbee2ad18af037be1bb diff --git a/vendor/revisions.json b/vendor/revisions.json index c4cea208ee..a9575a2cb7 100644 --- a/vendor/revisions.json +++ b/vendor/revisions.json @@ -1,5 +1,5 @@ { - "postgres-v16": "e3a22b72922055f9212eca12700190f118578362", - "postgres-v15": "bc88f539312fcc4bb292ce94ae9db09ab6656e8a", - "postgres-v14": "dd067cf656f6810a25aca6025633d32d02c5085a" + "postgres-v16": "863b71572bc441581efb3bbee2ad18af037be1bb", + "postgres-v15": "24333abb81a9ecae4541019478f0bf7d0b289df7", + "postgres-v14": "0bb356aa0cd1582112926fbcf0b5370222c2db6d" } diff --git a/vm-image-spec.yaml b/vm-image-spec.yaml index 6f0ebe5f66..804405293f 100644 --- a/vm-image-spec.yaml +++ b/vm-image-spec.yaml @@ -34,7 +34,7 @@ files: server_tls_sslmode=disable pool_mode=transaction max_client_conn=10000 - default_pool_size=16 + default_pool_size=64 max_prepared_statements=0 - filename: cgconfig.conf content: |