mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-27 18:10:37 +00:00
eviction: avoid post-restart download by synthetic_size (#3871)
As of #3867, we do artificial layer accesses to layers that will be needed after the next restart, but not until then because of caches. With this patch, we also do that for the accesses that the synthetic size calculation worker does if consumption metrics are enabled. The actual size calculation is not of importance, but we need to calculate all of the sizes, so we only call tenant::size::gather_inputs. Co-authored-by: Christian Schwarz <christian@neon.tech>
This commit is contained in:
@@ -165,6 +165,10 @@ pub struct PageServerConf {
|
||||
|
||||
/// Number of concurrent [`Tenant::gather_size_inputs`] allowed.
|
||||
pub concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore,
|
||||
/// Limit of concurrent [`Tenant::gather_size_inputs`] issued by module `eviction_task`.
|
||||
/// The number of permits is the same as `concurrent_tenant_size_logical_size_queries`.
|
||||
/// See the comment in `eviction_task` for details.
|
||||
pub eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore,
|
||||
|
||||
// How often to collect metrics and send them to the metrics endpoint.
|
||||
pub metric_collection_interval: Duration,
|
||||
@@ -239,7 +243,7 @@ struct PageServerConfigBuilder {
|
||||
|
||||
log_format: BuilderValue<LogFormat>,
|
||||
|
||||
concurrent_tenant_size_logical_size_queries: BuilderValue<ConfigurableSemaphore>,
|
||||
concurrent_tenant_size_logical_size_queries: BuilderValue<NonZeroUsize>,
|
||||
|
||||
metric_collection_interval: BuilderValue<Duration>,
|
||||
cached_metric_collection_interval: BuilderValue<Duration>,
|
||||
@@ -286,7 +290,9 @@ impl Default for PageServerConfigBuilder {
|
||||
.expect("cannot parse default keepalive interval")),
|
||||
log_format: Set(LogFormat::from_str(DEFAULT_LOG_FORMAT).unwrap()),
|
||||
|
||||
concurrent_tenant_size_logical_size_queries: Set(ConfigurableSemaphore::default()),
|
||||
concurrent_tenant_size_logical_size_queries: Set(
|
||||
ConfigurableSemaphore::DEFAULT_INITIAL,
|
||||
),
|
||||
metric_collection_interval: Set(humantime::parse_duration(
|
||||
DEFAULT_METRIC_COLLECTION_INTERVAL,
|
||||
)
|
||||
@@ -389,7 +395,7 @@ impl PageServerConfigBuilder {
|
||||
self.log_format = BuilderValue::Set(log_format)
|
||||
}
|
||||
|
||||
pub fn concurrent_tenant_size_logical_size_queries(&mut self, u: ConfigurableSemaphore) {
|
||||
pub fn concurrent_tenant_size_logical_size_queries(&mut self, u: NonZeroUsize) {
|
||||
self.concurrent_tenant_size_logical_size_queries = BuilderValue::Set(u);
|
||||
}
|
||||
|
||||
@@ -434,6 +440,11 @@ impl PageServerConfigBuilder {
|
||||
}
|
||||
|
||||
pub fn build(self) -> anyhow::Result<PageServerConf> {
|
||||
let concurrent_tenant_size_logical_size_queries = self
|
||||
.concurrent_tenant_size_logical_size_queries
|
||||
.ok_or(anyhow!(
|
||||
"missing concurrent_tenant_size_logical_size_queries"
|
||||
))?;
|
||||
Ok(PageServerConf {
|
||||
listen_pg_addr: self
|
||||
.listen_pg_addr
|
||||
@@ -481,11 +492,12 @@ impl PageServerConfigBuilder {
|
||||
.broker_keepalive_interval
|
||||
.ok_or(anyhow!("No broker keepalive interval provided"))?,
|
||||
log_format: self.log_format.ok_or(anyhow!("missing log_format"))?,
|
||||
concurrent_tenant_size_logical_size_queries: self
|
||||
.concurrent_tenant_size_logical_size_queries
|
||||
.ok_or(anyhow!(
|
||||
"missing concurrent_tenant_size_logical_size_queries"
|
||||
))?,
|
||||
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::new(
|
||||
concurrent_tenant_size_logical_size_queries,
|
||||
),
|
||||
eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore::new(
|
||||
concurrent_tenant_size_logical_size_queries,
|
||||
),
|
||||
metric_collection_interval: self
|
||||
.metric_collection_interval
|
||||
.ok_or(anyhow!("missing metric_collection_interval"))?,
|
||||
@@ -680,8 +692,7 @@ impl PageServerConf {
|
||||
"concurrent_tenant_size_logical_size_queries" => builder.concurrent_tenant_size_logical_size_queries({
|
||||
let input = parse_toml_string(key, item)?;
|
||||
let permits = input.parse::<usize>().context("expected a number of initial permits, not {s:?}")?;
|
||||
let permits = NonZeroUsize::new(permits).context("initial semaphore permits out of range: 0, use other configuration to disable a feature")?;
|
||||
ConfigurableSemaphore::new(permits)
|
||||
NonZeroUsize::new(permits).context("initial semaphore permits out of range: 0, use other configuration to disable a feature")?
|
||||
}),
|
||||
"metric_collection_interval" => builder.metric_collection_interval(parse_toml_duration(key, item)?),
|
||||
"cached_metric_collection_interval" => builder.cached_metric_collection_interval(parse_toml_duration(key, item)?),
|
||||
@@ -829,6 +840,8 @@ impl PageServerConf {
|
||||
broker_keepalive_interval: Duration::from_secs(5000),
|
||||
log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
|
||||
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
|
||||
eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore::default(
|
||||
),
|
||||
metric_collection_interval: Duration::from_secs(60),
|
||||
cached_metric_collection_interval: Duration::from_secs(60 * 60),
|
||||
metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
|
||||
@@ -921,6 +934,11 @@ impl ConfigurableSemaphore {
|
||||
inner: std::sync::Arc::new(tokio::sync::Semaphore::new(initial_permits.get())),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the configured amount of permits.
|
||||
pub fn initial_permits(&self) -> NonZeroUsize {
|
||||
self.initial_permits
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ConfigurableSemaphore {
|
||||
@@ -1025,6 +1043,8 @@ log_format = 'json'
|
||||
)?,
|
||||
log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
|
||||
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
|
||||
eviction_task_immitated_concurrent_logical_size_queries:
|
||||
ConfigurableSemaphore::default(),
|
||||
metric_collection_interval: humantime::parse_duration(
|
||||
defaults::DEFAULT_METRIC_COLLECTION_INTERVAL
|
||||
)?,
|
||||
@@ -1085,6 +1105,8 @@ log_format = 'json'
|
||||
broker_keepalive_interval: Duration::from_secs(5),
|
||||
log_format: LogFormat::Json,
|
||||
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
|
||||
eviction_task_immitated_concurrent_logical_size_queries:
|
||||
ConfigurableSemaphore::default(),
|
||||
metric_collection_interval: Duration::from_secs(222),
|
||||
cached_metric_collection_interval: Duration::from_secs(22200),
|
||||
metric_collection_endpoint: Some(Url::parse("http://localhost:80/metrics")?),
|
||||
|
||||
@@ -46,6 +46,7 @@ use std::time::{Duration, Instant};
|
||||
use self::config::TenantConf;
|
||||
use self::metadata::TimelineMetadata;
|
||||
use self::remote_timeline_client::RemoteTimelineClient;
|
||||
use self::timeline::EvictionTaskTenantState;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::import_datadir;
|
||||
@@ -142,6 +143,8 @@ pub struct Tenant {
|
||||
/// Cached logical sizes updated updated on each [`Tenant::gather_size_inputs`].
|
||||
cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
|
||||
cached_synthetic_tenant_size: Arc<AtomicU64>,
|
||||
|
||||
eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
|
||||
}
|
||||
|
||||
/// A timeline with some of its files on disk, being initialized.
|
||||
@@ -1781,6 +1784,7 @@ impl Tenant {
|
||||
state,
|
||||
cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
|
||||
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
|
||||
eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ use std::sync::Arc;
|
||||
use anyhow::{bail, Context};
|
||||
use tokio::sync::oneshot::error::RecvError;
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
|
||||
@@ -352,6 +353,10 @@ async fn fill_logical_sizes(
|
||||
// our advantage with `?` error handling.
|
||||
let mut joinset = tokio::task::JoinSet::new();
|
||||
|
||||
let cancel = tokio_util::sync::CancellationToken::new();
|
||||
// be sure to cancel all spawned tasks if we are dropped
|
||||
let _dg = cancel.clone().drop_guard();
|
||||
|
||||
// For each point that would benefit from having a logical size available,
|
||||
// spawn a Task to fetch it, unless we have it cached already.
|
||||
for seg in segments.iter() {
|
||||
@@ -373,6 +378,7 @@ async fn fill_logical_sizes(
|
||||
timeline,
|
||||
lsn,
|
||||
ctx,
|
||||
cancel.child_token(),
|
||||
));
|
||||
}
|
||||
e.insert(cached_size);
|
||||
@@ -477,13 +483,14 @@ async fn calculate_logical_size(
|
||||
timeline: Arc<crate::tenant::Timeline>,
|
||||
lsn: utils::lsn::Lsn,
|
||||
ctx: RequestContext,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<TimelineAtLsnSizeResult, RecvError> {
|
||||
let _permit = tokio::sync::Semaphore::acquire_owned(limit)
|
||||
.await
|
||||
.expect("global semaphore should not had been closed");
|
||||
|
||||
let size_res = timeline
|
||||
.spawn_ondemand_logical_size_calculation(lsn, ctx)
|
||||
.spawn_ondemand_logical_size_calculation(lsn, ctx, cancel)
|
||||
.instrument(info_span!("spawn_ondemand_logical_size_calculation"))
|
||||
.await?;
|
||||
Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res))
|
||||
|
||||
@@ -71,6 +71,7 @@ use crate::ZERO_PAGE;
|
||||
use crate::{is_temporary, task_mgr};
|
||||
use walreceiver::spawn_connection_manager_task;
|
||||
|
||||
pub(super) use self::eviction_task::EvictionTaskTenantState;
|
||||
use self::eviction_task::EvictionTaskTimelineState;
|
||||
|
||||
use super::layer_map::BatchedUpdates;
|
||||
@@ -1737,8 +1738,11 @@ impl Timeline {
|
||||
false,
|
||||
// NB: don't log errors here, task_mgr will do that.
|
||||
async move {
|
||||
// no cancellation here, because nothing really waits for this to complete compared
|
||||
// to spawn_ondemand_logical_size_calculation.
|
||||
let cancel = CancellationToken::new();
|
||||
let calculated_size = match self_clone
|
||||
.logical_size_calculation_task(lsn, &background_ctx)
|
||||
.logical_size_calculation_task(lsn, &background_ctx, cancel)
|
||||
.await
|
||||
{
|
||||
Ok(s) => s,
|
||||
@@ -1793,6 +1797,7 @@ impl Timeline {
|
||||
self: &Arc<Self>,
|
||||
lsn: Lsn,
|
||||
ctx: RequestContext,
|
||||
cancel: CancellationToken,
|
||||
) -> oneshot::Receiver<Result<u64, CalculateLogicalSizeError>> {
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
let self_clone = Arc::clone(self);
|
||||
@@ -1812,7 +1817,9 @@ impl Timeline {
|
||||
"ondemand logical size calculation",
|
||||
false,
|
||||
async move {
|
||||
let res = self_clone.logical_size_calculation_task(lsn, &ctx).await;
|
||||
let res = self_clone
|
||||
.logical_size_calculation_task(lsn, &ctx, cancel)
|
||||
.await;
|
||||
let _ = sender.send(res).ok();
|
||||
Ok(()) // Receiver is responsible for handling errors
|
||||
},
|
||||
@@ -1825,10 +1832,10 @@ impl Timeline {
|
||||
self: &Arc<Self>,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<u64, CalculateLogicalSizeError> {
|
||||
let mut timeline_state_updates = self.subscribe_for_state_updates();
|
||||
let self_calculation = Arc::clone(self);
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let calculation = async {
|
||||
let cancel = cancel.child_token();
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
//!
|
||||
//! See write-up on restart on-demand download spike: <https://gist.github.com/problame/2265bf7b8dc398be834abfead36c76b5>
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
ops::ControlFlow,
|
||||
sync::Arc,
|
||||
time::{Duration, SystemTime},
|
||||
@@ -30,6 +31,7 @@ use crate::{
|
||||
tenant::{
|
||||
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
|
||||
storage_layer::PersistentLayer,
|
||||
Tenant,
|
||||
},
|
||||
};
|
||||
|
||||
@@ -37,7 +39,12 @@ use super::Timeline;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct EvictionTaskTimelineState {
|
||||
last_refresh_required_in_restart: Option<tokio::time::Instant>,
|
||||
last_layer_access_imitation: Option<tokio::time::Instant>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct EvictionTaskTenantState {
|
||||
last_layer_access_imitation: Option<Instant>,
|
||||
}
|
||||
|
||||
impl Timeline {
|
||||
@@ -127,6 +134,35 @@ impl Timeline {
|
||||
) -> ControlFlow<()> {
|
||||
let now = SystemTime::now();
|
||||
|
||||
// If we evict layers but keep cached values derived from those layers, then
|
||||
// we face a storm of on-demand downloads after pageserver restart.
|
||||
// The reason is that the restart empties the caches, and so, the values
|
||||
// need to be re-computed by accessing layers, which we evicted while the
|
||||
// caches were filled.
|
||||
//
|
||||
// Solutions here would be one of the following:
|
||||
// 1. Have a persistent cache.
|
||||
// 2. Count every access to a cached value to the access stats of all layers
|
||||
// that were accessed to compute the value in the first place.
|
||||
// 3. Invalidate the caches at a period of < p.threshold/2, so that the values
|
||||
// get re-computed from layers, thereby counting towards layer access stats.
|
||||
// 4. Make the eviction task imitate the layer accesses that typically hit caches.
|
||||
//
|
||||
// We follow approach (4) here because in Neon prod deployment:
|
||||
// - page cache is quite small => high churn => low hit rate
|
||||
// => eviction gets correct access stats
|
||||
// - value-level caches such as logical size & repatition have a high hit rate,
|
||||
// especially for inactive tenants
|
||||
// => eviction sees zero accesses for these
|
||||
// => they cause the on-demand download storm on pageserver restart
|
||||
//
|
||||
// We should probably move to persistent caches in the future, or avoid
|
||||
// having inactive tenants attached to pageserver in the first place.
|
||||
match self.imitate_layer_accesses(p, cancel, ctx).await {
|
||||
ControlFlow::Break(()) => return ControlFlow::Break(()),
|
||||
ControlFlow::Continue(()) => (),
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Debug, Default)]
|
||||
struct EvictionStats {
|
||||
@@ -137,27 +173,6 @@ impl Timeline {
|
||||
skipped_for_shutdown: usize,
|
||||
}
|
||||
|
||||
// what we want is to invalidate any caches which haven't been accessed for `p.threshold`,
|
||||
// but we cannot actually do it for current limitations except by restarting pageserver. we
|
||||
// just recompute the values which would be recomputed on startup.
|
||||
//
|
||||
// for active tenants this will likely materialized page cache or in-memory layers. for
|
||||
// inactive tenants it will refresh the last_access timestamps so that we will not evict
|
||||
// and re-download on restart these layers.
|
||||
let mut state = self.eviction_task_timeline_state.lock().await;
|
||||
match state.last_refresh_required_in_restart {
|
||||
Some(ts) if ts.elapsed() < p.threshold => { /* no need to run */ }
|
||||
_ => {
|
||||
self.refresh_layers_required_in_restart(cancel, ctx).await;
|
||||
state.last_refresh_required_in_restart = Some(tokio::time::Instant::now())
|
||||
}
|
||||
}
|
||||
drop(state);
|
||||
|
||||
if cancel.is_cancelled() {
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
|
||||
let mut stats = EvictionStats::default();
|
||||
// Gather layers for eviction.
|
||||
// NB: all the checks can be invalidated as soon as we release the layer map lock.
|
||||
@@ -261,8 +276,55 @@ impl Timeline {
|
||||
ControlFlow::Continue(())
|
||||
}
|
||||
|
||||
async fn imitate_layer_accesses(
|
||||
&self,
|
||||
p: &EvictionPolicyLayerAccessThreshold,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> ControlFlow<()> {
|
||||
let mut state = self.eviction_task_timeline_state.lock().await;
|
||||
match state.last_layer_access_imitation {
|
||||
Some(ts) if ts.elapsed() < p.threshold => { /* no need to run */ }
|
||||
_ => {
|
||||
self.imitate_timeline_cached_layer_accesses(cancel, ctx)
|
||||
.await;
|
||||
state.last_layer_access_imitation = Some(tokio::time::Instant::now())
|
||||
}
|
||||
}
|
||||
drop(state);
|
||||
|
||||
if cancel.is_cancelled() {
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
|
||||
// This task is timeline-scoped, but the synthetic size calculation is tenant-scoped.
|
||||
// Make one of the tenant's timelines draw the short straw and run the calculation.
|
||||
// The others wait until the calculation is done so that they take into account the
|
||||
// imitated accesses that the winner made.
|
||||
let Ok(tenant) = crate::tenant::mgr::get_tenant(self.tenant_id, true).await else {
|
||||
// likely, we're shutting down
|
||||
return ControlFlow::Break(());
|
||||
};
|
||||
let mut state = tenant.eviction_task_tenant_state.lock().await;
|
||||
match state.last_layer_access_imitation {
|
||||
Some(ts) if ts.elapsed() < p.threshold => { /* no need to run */ }
|
||||
_ => {
|
||||
self.imitate_synthetic_size_calculation_worker(&tenant, ctx, cancel)
|
||||
.await;
|
||||
state.last_layer_access_imitation = Some(tokio::time::Instant::now());
|
||||
}
|
||||
}
|
||||
drop(state);
|
||||
|
||||
if cancel.is_cancelled() {
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
|
||||
ControlFlow::Continue(())
|
||||
}
|
||||
|
||||
/// Recompute the values which would cause on-demand downloads during restart.
|
||||
async fn refresh_layers_required_in_restart(
|
||||
async fn imitate_timeline_cached_layer_accesses(
|
||||
&self,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
@@ -296,4 +358,62 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Imitate the synthetic size calculation done by the consumption_metrics module.
|
||||
async fn imitate_synthetic_size_calculation_worker(
|
||||
&self,
|
||||
tenant: &Arc<Tenant>,
|
||||
ctx: &RequestContext,
|
||||
cancel: &CancellationToken,
|
||||
) {
|
||||
if self.conf.metric_collection_endpoint.is_none() {
|
||||
// We don't start the consumption metrics task if this is not set in the config.
|
||||
// So, no need to imitate the accesses in that case.
|
||||
return;
|
||||
}
|
||||
|
||||
// The consumption metrics are collected on a per-tenant basis, by a single
|
||||
// global background loop.
|
||||
// It limits the number of synthetic size calculations using the global
|
||||
// `concurrent_tenant_size_logical_size_queries` semaphore to not overload
|
||||
// the pageserver. (size calculation is somewhat expensive in terms of CPU and IOs).
|
||||
//
|
||||
// If we used that same semaphore here, then we'd compete for the
|
||||
// same permits, which may impact timeliness of consumption metrics.
|
||||
// That is a no-go, as consumption metrics are much more important
|
||||
// than what we do here.
|
||||
//
|
||||
// So, we have a separate semaphore, initialized to the same
|
||||
// number of permits as the `concurrent_tenant_size_logical_size_queries`.
|
||||
// In the worst, we would have twice the amount of concurrenct size calculations.
|
||||
// But in practice, the `p.threshold` >> `consumption metric interval`, and
|
||||
// we spread out the eviction task using `random_init_delay`.
|
||||
// So, the chance of the worst case is quite low in practice.
|
||||
// It runs as a per-tenant task, but the eviction_task.rs is per-timeline.
|
||||
// So, we must coordinate with other with other eviction tasks of this tenant.
|
||||
let limit = self
|
||||
.conf
|
||||
.eviction_task_immitated_concurrent_logical_size_queries
|
||||
.inner();
|
||||
|
||||
let mut throwaway_cache = HashMap::new();
|
||||
let gather =
|
||||
crate::tenant::size::gather_inputs(tenant, limit, None, &mut throwaway_cache, ctx);
|
||||
tokio::pin!(gather);
|
||||
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {}
|
||||
gather_result = gather => {
|
||||
match gather_result {
|
||||
Ok(_) => {},
|
||||
Err(e) => {
|
||||
// We don't care about the result, but, if it failed, we should log it,
|
||||
// since consumption metric might be hitting the cached value and
|
||||
// thus not encountering this error.
|
||||
warn!("failed to imitate synthetic size calculation accesses: {e:#}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user