diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 39282ce320..58a6056385 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -165,6 +165,10 @@ pub struct PageServerConf { /// Number of concurrent [`Tenant::gather_size_inputs`] allowed. pub concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore, + /// Limit of concurrent [`Tenant::gather_size_inputs`] issued by module `eviction_task`. + /// The number of permits is the same as `concurrent_tenant_size_logical_size_queries`. + /// See the comment in `eviction_task` for details. + pub eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore, // How often to collect metrics and send them to the metrics endpoint. pub metric_collection_interval: Duration, @@ -239,7 +243,7 @@ struct PageServerConfigBuilder { log_format: BuilderValue, - concurrent_tenant_size_logical_size_queries: BuilderValue, + concurrent_tenant_size_logical_size_queries: BuilderValue, metric_collection_interval: BuilderValue, cached_metric_collection_interval: BuilderValue, @@ -286,7 +290,9 @@ impl Default for PageServerConfigBuilder { .expect("cannot parse default keepalive interval")), log_format: Set(LogFormat::from_str(DEFAULT_LOG_FORMAT).unwrap()), - concurrent_tenant_size_logical_size_queries: Set(ConfigurableSemaphore::default()), + concurrent_tenant_size_logical_size_queries: Set( + ConfigurableSemaphore::DEFAULT_INITIAL, + ), metric_collection_interval: Set(humantime::parse_duration( DEFAULT_METRIC_COLLECTION_INTERVAL, ) @@ -389,7 +395,7 @@ impl PageServerConfigBuilder { self.log_format = BuilderValue::Set(log_format) } - pub fn concurrent_tenant_size_logical_size_queries(&mut self, u: ConfigurableSemaphore) { + pub fn concurrent_tenant_size_logical_size_queries(&mut self, u: NonZeroUsize) { self.concurrent_tenant_size_logical_size_queries = BuilderValue::Set(u); } @@ -434,6 +440,11 @@ impl PageServerConfigBuilder { } pub fn build(self) -> anyhow::Result { + let concurrent_tenant_size_logical_size_queries = self + .concurrent_tenant_size_logical_size_queries + .ok_or(anyhow!( + "missing concurrent_tenant_size_logical_size_queries" + ))?; Ok(PageServerConf { listen_pg_addr: self .listen_pg_addr @@ -481,11 +492,12 @@ impl PageServerConfigBuilder { .broker_keepalive_interval .ok_or(anyhow!("No broker keepalive interval provided"))?, log_format: self.log_format.ok_or(anyhow!("missing log_format"))?, - concurrent_tenant_size_logical_size_queries: self - .concurrent_tenant_size_logical_size_queries - .ok_or(anyhow!( - "missing concurrent_tenant_size_logical_size_queries" - ))?, + concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::new( + concurrent_tenant_size_logical_size_queries, + ), + eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore::new( + concurrent_tenant_size_logical_size_queries, + ), metric_collection_interval: self .metric_collection_interval .ok_or(anyhow!("missing metric_collection_interval"))?, @@ -680,8 +692,7 @@ impl PageServerConf { "concurrent_tenant_size_logical_size_queries" => builder.concurrent_tenant_size_logical_size_queries({ let input = parse_toml_string(key, item)?; let permits = input.parse::().context("expected a number of initial permits, not {s:?}")?; - let permits = NonZeroUsize::new(permits).context("initial semaphore permits out of range: 0, use other configuration to disable a feature")?; - ConfigurableSemaphore::new(permits) + NonZeroUsize::new(permits).context("initial semaphore permits out of range: 0, use other configuration to disable a feature")? }), "metric_collection_interval" => builder.metric_collection_interval(parse_toml_duration(key, item)?), "cached_metric_collection_interval" => builder.cached_metric_collection_interval(parse_toml_duration(key, item)?), @@ -829,6 +840,8 @@ impl PageServerConf { broker_keepalive_interval: Duration::from_secs(5000), log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(), concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(), + eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore::default( + ), metric_collection_interval: Duration::from_secs(60), cached_metric_collection_interval: Duration::from_secs(60 * 60), metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT, @@ -921,6 +934,11 @@ impl ConfigurableSemaphore { inner: std::sync::Arc::new(tokio::sync::Semaphore::new(initial_permits.get())), } } + + /// Returns the configured amount of permits. + pub fn initial_permits(&self) -> NonZeroUsize { + self.initial_permits + } } impl Default for ConfigurableSemaphore { @@ -1025,6 +1043,8 @@ log_format = 'json' )?, log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(), concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(), + eviction_task_immitated_concurrent_logical_size_queries: + ConfigurableSemaphore::default(), metric_collection_interval: humantime::parse_duration( defaults::DEFAULT_METRIC_COLLECTION_INTERVAL )?, @@ -1085,6 +1105,8 @@ log_format = 'json' broker_keepalive_interval: Duration::from_secs(5), log_format: LogFormat::Json, concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(), + eviction_task_immitated_concurrent_logical_size_queries: + ConfigurableSemaphore::default(), metric_collection_interval: Duration::from_secs(222), cached_metric_collection_interval: Duration::from_secs(22200), metric_collection_endpoint: Some(Url::parse("http://localhost:80/metrics")?), diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 0a167fd787..2c5226e5bc 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -46,6 +46,7 @@ use std::time::{Duration, Instant}; use self::config::TenantConf; use self::metadata::TimelineMetadata; use self::remote_timeline_client::RemoteTimelineClient; +use self::timeline::EvictionTaskTenantState; use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; use crate::import_datadir; @@ -142,6 +143,8 @@ pub struct Tenant { /// Cached logical sizes updated updated on each [`Tenant::gather_size_inputs`]. cached_logical_sizes: tokio::sync::Mutex>, cached_synthetic_tenant_size: Arc, + + eviction_task_tenant_state: tokio::sync::Mutex, } /// A timeline with some of its files on disk, being initialized. @@ -1781,6 +1784,7 @@ impl Tenant { state, cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()), cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)), + eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()), } } diff --git a/pageserver/src/tenant/size.rs b/pageserver/src/tenant/size.rs index a41889f16d..77275f96bd 100644 --- a/pageserver/src/tenant/size.rs +++ b/pageserver/src/tenant/size.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use anyhow::{bail, Context}; use tokio::sync::oneshot::error::RecvError; use tokio::sync::Semaphore; +use tokio_util::sync::CancellationToken; use crate::context::RequestContext; use crate::pgdatadir_mapping::CalculateLogicalSizeError; @@ -352,6 +353,10 @@ async fn fill_logical_sizes( // our advantage with `?` error handling. let mut joinset = tokio::task::JoinSet::new(); + let cancel = tokio_util::sync::CancellationToken::new(); + // be sure to cancel all spawned tasks if we are dropped + let _dg = cancel.clone().drop_guard(); + // For each point that would benefit from having a logical size available, // spawn a Task to fetch it, unless we have it cached already. for seg in segments.iter() { @@ -373,6 +378,7 @@ async fn fill_logical_sizes( timeline, lsn, ctx, + cancel.child_token(), )); } e.insert(cached_size); @@ -477,13 +483,14 @@ async fn calculate_logical_size( timeline: Arc, lsn: utils::lsn::Lsn, ctx: RequestContext, + cancel: CancellationToken, ) -> Result { let _permit = tokio::sync::Semaphore::acquire_owned(limit) .await .expect("global semaphore should not had been closed"); let size_res = timeline - .spawn_ondemand_logical_size_calculation(lsn, ctx) + .spawn_ondemand_logical_size_calculation(lsn, ctx, cancel) .instrument(info_span!("spawn_ondemand_logical_size_calculation")) .await?; Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res)) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index dfa0e842f1..611c2c27d3 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -71,6 +71,7 @@ use crate::ZERO_PAGE; use crate::{is_temporary, task_mgr}; use walreceiver::spawn_connection_manager_task; +pub(super) use self::eviction_task::EvictionTaskTenantState; use self::eviction_task::EvictionTaskTimelineState; use super::layer_map::BatchedUpdates; @@ -1737,8 +1738,11 @@ impl Timeline { false, // NB: don't log errors here, task_mgr will do that. async move { + // no cancellation here, because nothing really waits for this to complete compared + // to spawn_ondemand_logical_size_calculation. + let cancel = CancellationToken::new(); let calculated_size = match self_clone - .logical_size_calculation_task(lsn, &background_ctx) + .logical_size_calculation_task(lsn, &background_ctx, cancel) .await { Ok(s) => s, @@ -1793,6 +1797,7 @@ impl Timeline { self: &Arc, lsn: Lsn, ctx: RequestContext, + cancel: CancellationToken, ) -> oneshot::Receiver> { let (sender, receiver) = oneshot::channel(); let self_clone = Arc::clone(self); @@ -1812,7 +1817,9 @@ impl Timeline { "ondemand logical size calculation", false, async move { - let res = self_clone.logical_size_calculation_task(lsn, &ctx).await; + let res = self_clone + .logical_size_calculation_task(lsn, &ctx, cancel) + .await; let _ = sender.send(res).ok(); Ok(()) // Receiver is responsible for handling errors }, @@ -1825,10 +1832,10 @@ impl Timeline { self: &Arc, lsn: Lsn, ctx: &RequestContext, + cancel: CancellationToken, ) -> Result { let mut timeline_state_updates = self.subscribe_for_state_updates(); let self_calculation = Arc::clone(self); - let cancel = CancellationToken::new(); let calculation = async { let cancel = cancel.child_token(); diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 06dfe7a0b9..3ec8c30d70 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -14,6 +14,7 @@ //! //! See write-up on restart on-demand download spike: use std::{ + collections::HashMap, ops::ControlFlow, sync::Arc, time::{Duration, SystemTime}, @@ -30,6 +31,7 @@ use crate::{ tenant::{ config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold}, storage_layer::PersistentLayer, + Tenant, }, }; @@ -37,7 +39,12 @@ use super::Timeline; #[derive(Default)] pub struct EvictionTaskTimelineState { - last_refresh_required_in_restart: Option, + last_layer_access_imitation: Option, +} + +#[derive(Default)] +pub struct EvictionTaskTenantState { + last_layer_access_imitation: Option, } impl Timeline { @@ -127,6 +134,35 @@ impl Timeline { ) -> ControlFlow<()> { let now = SystemTime::now(); + // If we evict layers but keep cached values derived from those layers, then + // we face a storm of on-demand downloads after pageserver restart. + // The reason is that the restart empties the caches, and so, the values + // need to be re-computed by accessing layers, which we evicted while the + // caches were filled. + // + // Solutions here would be one of the following: + // 1. Have a persistent cache. + // 2. Count every access to a cached value to the access stats of all layers + // that were accessed to compute the value in the first place. + // 3. Invalidate the caches at a period of < p.threshold/2, so that the values + // get re-computed from layers, thereby counting towards layer access stats. + // 4. Make the eviction task imitate the layer accesses that typically hit caches. + // + // We follow approach (4) here because in Neon prod deployment: + // - page cache is quite small => high churn => low hit rate + // => eviction gets correct access stats + // - value-level caches such as logical size & repatition have a high hit rate, + // especially for inactive tenants + // => eviction sees zero accesses for these + // => they cause the on-demand download storm on pageserver restart + // + // We should probably move to persistent caches in the future, or avoid + // having inactive tenants attached to pageserver in the first place. + match self.imitate_layer_accesses(p, cancel, ctx).await { + ControlFlow::Break(()) => return ControlFlow::Break(()), + ControlFlow::Continue(()) => (), + } + #[allow(dead_code)] #[derive(Debug, Default)] struct EvictionStats { @@ -137,27 +173,6 @@ impl Timeline { skipped_for_shutdown: usize, } - // what we want is to invalidate any caches which haven't been accessed for `p.threshold`, - // but we cannot actually do it for current limitations except by restarting pageserver. we - // just recompute the values which would be recomputed on startup. - // - // for active tenants this will likely materialized page cache or in-memory layers. for - // inactive tenants it will refresh the last_access timestamps so that we will not evict - // and re-download on restart these layers. - let mut state = self.eviction_task_timeline_state.lock().await; - match state.last_refresh_required_in_restart { - Some(ts) if ts.elapsed() < p.threshold => { /* no need to run */ } - _ => { - self.refresh_layers_required_in_restart(cancel, ctx).await; - state.last_refresh_required_in_restart = Some(tokio::time::Instant::now()) - } - } - drop(state); - - if cancel.is_cancelled() { - return ControlFlow::Break(()); - } - let mut stats = EvictionStats::default(); // Gather layers for eviction. // NB: all the checks can be invalidated as soon as we release the layer map lock. @@ -261,8 +276,55 @@ impl Timeline { ControlFlow::Continue(()) } + async fn imitate_layer_accesses( + &self, + p: &EvictionPolicyLayerAccessThreshold, + cancel: &CancellationToken, + ctx: &RequestContext, + ) -> ControlFlow<()> { + let mut state = self.eviction_task_timeline_state.lock().await; + match state.last_layer_access_imitation { + Some(ts) if ts.elapsed() < p.threshold => { /* no need to run */ } + _ => { + self.imitate_timeline_cached_layer_accesses(cancel, ctx) + .await; + state.last_layer_access_imitation = Some(tokio::time::Instant::now()) + } + } + drop(state); + + if cancel.is_cancelled() { + return ControlFlow::Break(()); + } + + // This task is timeline-scoped, but the synthetic size calculation is tenant-scoped. + // Make one of the tenant's timelines draw the short straw and run the calculation. + // The others wait until the calculation is done so that they take into account the + // imitated accesses that the winner made. + let Ok(tenant) = crate::tenant::mgr::get_tenant(self.tenant_id, true).await else { + // likely, we're shutting down + return ControlFlow::Break(()); + }; + let mut state = tenant.eviction_task_tenant_state.lock().await; + match state.last_layer_access_imitation { + Some(ts) if ts.elapsed() < p.threshold => { /* no need to run */ } + _ => { + self.imitate_synthetic_size_calculation_worker(&tenant, ctx, cancel) + .await; + state.last_layer_access_imitation = Some(tokio::time::Instant::now()); + } + } + drop(state); + + if cancel.is_cancelled() { + return ControlFlow::Break(()); + } + + ControlFlow::Continue(()) + } + /// Recompute the values which would cause on-demand downloads during restart. - async fn refresh_layers_required_in_restart( + async fn imitate_timeline_cached_layer_accesses( &self, cancel: &CancellationToken, ctx: &RequestContext, @@ -296,4 +358,62 @@ impl Timeline { } } } + + // Imitate the synthetic size calculation done by the consumption_metrics module. + async fn imitate_synthetic_size_calculation_worker( + &self, + tenant: &Arc, + ctx: &RequestContext, + cancel: &CancellationToken, + ) { + if self.conf.metric_collection_endpoint.is_none() { + // We don't start the consumption metrics task if this is not set in the config. + // So, no need to imitate the accesses in that case. + return; + } + + // The consumption metrics are collected on a per-tenant basis, by a single + // global background loop. + // It limits the number of synthetic size calculations using the global + // `concurrent_tenant_size_logical_size_queries` semaphore to not overload + // the pageserver. (size calculation is somewhat expensive in terms of CPU and IOs). + // + // If we used that same semaphore here, then we'd compete for the + // same permits, which may impact timeliness of consumption metrics. + // That is a no-go, as consumption metrics are much more important + // than what we do here. + // + // So, we have a separate semaphore, initialized to the same + // number of permits as the `concurrent_tenant_size_logical_size_queries`. + // In the worst, we would have twice the amount of concurrenct size calculations. + // But in practice, the `p.threshold` >> `consumption metric interval`, and + // we spread out the eviction task using `random_init_delay`. + // So, the chance of the worst case is quite low in practice. + // It runs as a per-tenant task, but the eviction_task.rs is per-timeline. + // So, we must coordinate with other with other eviction tasks of this tenant. + let limit = self + .conf + .eviction_task_immitated_concurrent_logical_size_queries + .inner(); + + let mut throwaway_cache = HashMap::new(); + let gather = + crate::tenant::size::gather_inputs(tenant, limit, None, &mut throwaway_cache, ctx); + tokio::pin!(gather); + + tokio::select! { + _ = cancel.cancelled() => {} + gather_result = gather => { + match gather_result { + Ok(_) => {}, + Err(e) => { + // We don't care about the result, but, if it failed, we should log it, + // since consumption metric might be hitting the cached value and + // thus not encountering this error. + warn!("failed to imitate synthetic size calculation accesses: {e:#}") + } + } + } + } + } }