feat: three phased startup order (#4399)

Initial logical size calculation could still hinder our fast startup
efforts in #4397. See #4183. In deployment of 2023-06-06
about a 200 initial logical sizes were calculated on hosts which
took the longest to complete initial load (12s).

Implements the three step/tier initialization ordering described in
#4397:
1. load local tenants
2. do initial logical sizes per walreceivers for 10s
3. background tasks

Ordering is controlled by:
- waiting on `utils::completion::Barrier`s on background tasks
- having one attempt for each Timeline to do initial logical size
calculation
- `pageserver/src/bin/pageserver.rs` releasing background jobs after
timeout or completion of initial logical size calculation

The timeout is there just to safeguard in case a legitimate non-broken
timeline initial logical size calculation goes long. The timeout is
configurable, by default 10s, which I think would be fine for production
systems. In the test cases I've been looking at, it seems that these
steps are completed as fast as possible.

Co-authored-by: Christian Schwarz <christian@neon.tech>
This commit is contained in:
Joonas Koivunen
2023-06-07 14:29:23 +03:00
committed by GitHub
parent 88f0cfc575
commit 5761190e0d
10 changed files with 295 additions and 77 deletions

View File

@@ -337,33 +337,114 @@ fn start_pageserver(
// Startup staging or optimizing:
//
// (init_done_tx, init_done_rx) are used to control when do background loops start. This is to
// avoid starving out the BACKGROUND_RUNTIME async worker threads doing heavy work, like
// initial repartitioning while we still have Loading tenants.
// We want to minimize downtime for `page_service` connections, and trying not to overload
// BACKGROUND_RUNTIME by doing initial compactions and initial logical sizes at the same time.
//
// init_done_rx is a barrier which stops waiting once all init_done_tx clones are dropped.
// init_done_rx will notify when all initial load operations have completed.
//
// background_jobs_can_start (same name used to hold off background jobs from starting at
// consumer side) will be dropped once we can start the background jobs. Currently it is behind
// completing all initial logical size calculations (init_logical_size_done_rx) and a timeout
// (background_task_maximum_delay).
let (init_done_tx, init_done_rx) = utils::completion::channel();
let (init_logical_size_done_tx, init_logical_size_done_rx) = utils::completion::channel();
let (background_jobs_can_start, background_jobs_barrier) = utils::completion::channel();
let order = pageserver::InitializationOrder {
initial_tenant_load: Some(init_done_tx),
initial_logical_size_can_start: init_done_rx.clone(),
initial_logical_size_attempt: init_logical_size_done_tx,
background_jobs_can_start: background_jobs_barrier.clone(),
};
// Scan the local 'tenants/' directory and start loading the tenants
let init_started_at = std::time::Instant::now();
let shutdown_pageserver = tokio_util::sync::CancellationToken::new();
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
conf,
broker_client.clone(),
remote_storage.clone(),
(init_done_tx, init_done_rx.clone()),
order,
))?;
BACKGROUND_RUNTIME.spawn({
let init_done_rx = init_done_rx.clone();
async move {
init_done_rx.wait().await;
let init_done_rx = init_done_rx;
let shutdown_pageserver = shutdown_pageserver.clone();
let drive_init = async move {
// NOTE: unlike many futures in pageserver, this one is cancellation-safe
let guard = scopeguard::guard_on_success((), |_| tracing::info!("Cancelled before initial load completed"));
let elapsed = init_started_at.elapsed();
init_done_rx.wait().await;
// initial logical sizes can now start, as they were waiting on init_done_rx.
scopeguard::ScopeGuard::into_inner(guard);
let init_done = std::time::Instant::now();
let elapsed = init_done - init_started_at;
tracing::info!(
elapsed_millis = elapsed.as_millis(),
"Initial load completed."
"Initial load completed"
);
let mut init_sizes_done = std::pin::pin!(init_logical_size_done_rx.wait());
let timeout = conf.background_task_maximum_delay;
let guard = scopeguard::guard_on_success((), |_| tracing::info!("Cancelled before initial logical sizes completed"));
let init_sizes_done = tokio::select! {
_ = &mut init_sizes_done => {
let now = std::time::Instant::now();
tracing::info!(
from_init_done_millis = (now - init_done).as_millis(),
from_init_millis = (now - init_started_at).as_millis(),
"Initial logical sizes completed"
);
None
}
_ = tokio::time::sleep(timeout) => {
tracing::info!(
timeout_millis = timeout.as_millis(),
"Initial logical size timeout elapsed; starting background jobs"
);
Some(init_sizes_done)
}
};
scopeguard::ScopeGuard::into_inner(guard);
// allow background jobs to start
drop(background_jobs_can_start);
if let Some(init_sizes_done) = init_sizes_done {
// ending up here is not a bug; at the latest logical sizes will be queried by
// consumption metrics.
let guard = scopeguard::guard_on_success((), |_| tracing::info!("Cancelled before initial logical sizes completed"));
init_sizes_done.await;
scopeguard::ScopeGuard::into_inner(guard);
let now = std::time::Instant::now();
tracing::info!(
from_init_done_millis = (now - init_done).as_millis(),
from_init_millis = (now - init_started_at).as_millis(),
"Initial logical sizes completed after timeout (background jobs already started)"
);
}
};
async move {
let mut drive_init = std::pin::pin!(drive_init);
// just race these tasks
tokio::select! {
_ = shutdown_pageserver.cancelled() => {},
_ = &mut drive_init => {},
}
}
});
@@ -378,7 +459,7 @@ fn start_pageserver(
conf,
remote_storage.clone(),
disk_usage_eviction_state.clone(),
init_done_rx.clone(),
background_jobs_barrier.clone(),
)?;
}
@@ -416,7 +497,7 @@ fn start_pageserver(
);
if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint {
let init_done_rx = init_done_rx;
let background_jobs_barrier = background_jobs_barrier;
let metrics_ctx = RequestContext::todo_child(
TaskKind::MetricsCollection,
// This task itself shouldn't download anything.
@@ -432,12 +513,17 @@ fn start_pageserver(
"consumption metrics collection",
true,
async move {
// first wait for initial load to complete before first iteration.
// first wait until background jobs are cleared to launch.
//
// this is because we only process active tenants and timelines, and the
// Timeline::get_current_logical_size will spawn the logical size calculation,
// which will not be rate-limited.
init_done_rx.wait().await;
let cancel = task_mgr::shutdown_token();
tokio::select! {
_ = cancel.cancelled() => { return Ok(()); },
_ = background_jobs_barrier.wait() => {}
};
pageserver::consumption_metrics::collect_metrics(
metric_collection_endpoint,
@@ -487,6 +573,8 @@ fn start_pageserver(
);
}
let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard());
// All started up! Now just sit and wait for shutdown signal.
ShutdownSignals::handle(|signal| match signal {
Signal::Quit => {
@@ -502,6 +590,11 @@ fn start_pageserver(
"Got {}. Terminating gracefully in fast shutdown mode",
signal.name()
);
// This cancels the `shutdown_pageserver` cancellation tree.
// Right now that tree doesn't reach very far, and `task_mgr` is used instead.
// The plan is to change that over time.
shutdown_pageserver.take();
BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver(0));
unreachable!()
}

View File

@@ -63,6 +63,7 @@ pub mod defaults {
pub const DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL: &str = "1 hour";
pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option<reqwest::Url> = None;
pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min";
pub const DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY: &str = "10s";
///
/// Default built-in configuration file.
@@ -91,9 +92,10 @@ pub mod defaults {
#cached_metric_collection_interval = '{DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL}'
#synthetic_size_calculation_interval = '{DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL}'
#disk_usage_based_eviction = {{ max_usage_pct = .., min_avail_bytes = .., period = "10s"}}
#background_task_maximum_delay = '{DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY}'
# [tenant_config]
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
#checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT}
@@ -187,6 +189,15 @@ pub struct PageServerConf {
pub test_remote_failures: u64,
pub ondemand_download_behavior_treat_error_as_warn: bool,
/// How long will background tasks be delayed at most after initial load of tenants.
///
/// Our largest initialization completions are in the range of 100-200s, so perhaps 10s works
/// as we now isolate initial loading, initial logical size calculation and background tasks.
/// Smaller nodes will have background tasks "not running" for this long unless every timeline
/// has it's initial logical size calculated. Not running background tasks for some seconds is
/// not terrible.
pub background_task_maximum_delay: Duration,
}
/// We do not want to store this in a PageServerConf because the latter may be logged
@@ -259,6 +270,8 @@ struct PageServerConfigBuilder {
test_remote_failures: BuilderValue<u64>,
ondemand_download_behavior_treat_error_as_warn: BuilderValue<bool>,
background_task_maximum_delay: BuilderValue<Duration>,
}
impl Default for PageServerConfigBuilder {
@@ -316,6 +329,11 @@ impl Default for PageServerConfigBuilder {
test_remote_failures: Set(0),
ondemand_download_behavior_treat_error_as_warn: Set(false),
background_task_maximum_delay: Set(humantime::parse_duration(
DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY,
)
.unwrap()),
}
}
}
@@ -440,6 +458,10 @@ impl PageServerConfigBuilder {
BuilderValue::Set(ondemand_download_behavior_treat_error_as_warn);
}
pub fn background_task_maximum_delay(&mut self, delay: Duration) {
self.background_task_maximum_delay = BuilderValue::Set(delay);
}
pub fn build(self) -> anyhow::Result<PageServerConf> {
let concurrent_tenant_size_logical_size_queries = self
.concurrent_tenant_size_logical_size_queries
@@ -522,6 +544,9 @@ impl PageServerConfigBuilder {
.ok_or(anyhow!(
"missing ondemand_download_behavior_treat_error_as_warn"
))?,
background_task_maximum_delay: self
.background_task_maximum_delay
.ok_or(anyhow!("missing background_task_maximum_delay"))?,
})
}
}
@@ -710,6 +735,7 @@ impl PageServerConf {
)
},
"ondemand_download_behavior_treat_error_as_warn" => builder.ondemand_download_behavior_treat_error_as_warn(parse_toml_bool(key, item)?),
"background_task_maximum_delay" => builder.background_task_maximum_delay(parse_toml_duration(key, item)?),
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
@@ -877,6 +903,7 @@ impl PageServerConf {
disk_usage_based_eviction: None,
test_remote_failures: 0,
ondemand_download_behavior_treat_error_as_warn: false,
background_task_maximum_delay: Duration::ZERO,
}
}
}
@@ -1036,6 +1063,7 @@ metric_collection_endpoint = 'http://localhost:80/metrics'
synthetic_size_calculation_interval = '333 s'
log_format = 'json'
background_task_maximum_delay = '334 s'
"#;
@@ -1094,6 +1122,9 @@ log_format = 'json'
disk_usage_based_eviction: None,
test_remote_failures: 0,
ondemand_download_behavior_treat_error_as_warn: false,
background_task_maximum_delay: humantime::parse_duration(
defaults::DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY
)?,
},
"Correct defaults should be used when no config values are provided"
);
@@ -1148,6 +1179,7 @@ log_format = 'json'
disk_usage_based_eviction: None,
test_remote_failures: 0,
ondemand_download_behavior_treat_error_as_warn: false,
background_task_maximum_delay: Duration::from_secs(334),
},
"Should be able to parse all basic config values correctly"
);

View File

@@ -83,7 +83,7 @@ pub fn launch_disk_usage_global_eviction_task(
conf: &'static PageServerConf,
storage: GenericRemoteStorage,
state: Arc<State>,
init_done: completion::Barrier,
background_jobs_barrier: completion::Barrier,
) -> anyhow::Result<()> {
let Some(task_config) = &conf.disk_usage_based_eviction else {
info!("disk usage based eviction task not configured");
@@ -100,17 +100,16 @@ pub fn launch_disk_usage_global_eviction_task(
"disk usage based eviction",
false,
async move {
// wait until initial load is complete, because we cannot evict from loading tenants.
init_done.wait().await;
let cancel = task_mgr::shutdown_token();
disk_usage_eviction_task(
&state,
task_config,
storage,
&conf.tenants_path(),
task_mgr::shutdown_token(),
)
.await;
// wait until initial load is complete, because we cannot evict from loading tenants.
tokio::select! {
_ = cancel.cancelled() => { return Ok(()); },
_ = background_jobs_barrier.wait() => { }
};
disk_usage_eviction_task(&state, task_config, storage, &conf.tenants_path(), cancel)
.await;
info!("disk usage based eviction task finishing");
Ok(())
},

View File

@@ -132,6 +132,29 @@ pub fn is_uninit_mark(path: &Path) -> bool {
}
}
/// During pageserver startup, we need to order operations not to exhaust tokio worker threads by
/// blocking.
///
/// The instances of this value exist only during startup, otherwise `None` is provided, meaning no
/// delaying is needed.
#[derive(Clone)]
pub struct InitializationOrder {
/// Each initial tenant load task carries this until completion.
pub initial_tenant_load: Option<utils::completion::Completion>,
/// Barrier for when we can start initial logical size calculations.
pub initial_logical_size_can_start: utils::completion::Barrier,
/// Each timeline owns a clone of this to be consumed on the initial logical size calculation
/// attempt. It is important to drop this once the attempt has completed.
pub initial_logical_size_attempt: utils::completion::Completion,
/// Barrier for when we can start any background jobs.
///
/// This can be broken up later on, but right now there is just one class of a background job.
pub background_jobs_can_start: utils::completion::Barrier,
}
#[cfg(test)]
mod backoff_defaults_tests {
use super::*;

View File

@@ -65,6 +65,7 @@ use crate::tenant::remote_timeline_client::PersistIndexPartWithDeletedFlagError;
use crate::tenant::storage_layer::DeltaLayer;
use crate::tenant::storage_layer::ImageLayer;
use crate::tenant::storage_layer::Layer;
use crate::InitializationOrder;
use crate::virtual_file::VirtualFile;
use crate::walredo::PostgresRedoManager;
@@ -510,6 +511,7 @@ impl Tenant {
local_metadata: Option<TimelineMetadata>,
ancestor: Option<Arc<Timeline>>,
first_save: bool,
init_order: Option<&InitializationOrder>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let tenant_id = self.tenant_id;
@@ -535,6 +537,7 @@ impl Tenant {
up_to_date_metadata,
ancestor.clone(),
remote_client,
init_order,
)?;
let timeline = UninitializedTimeline {
@@ -560,6 +563,7 @@ impl Tenant {
up_to_date_metadata,
ancestor.clone(),
None,
None,
)
.with_context(|| {
format!("creating broken timeline data for {tenant_id}/{timeline_id}")
@@ -858,6 +862,7 @@ impl Tenant {
local_metadata,
ancestor,
true,
None,
ctx,
)
.await
@@ -892,16 +897,13 @@ impl Tenant {
///
/// If the loading fails for some reason, the Tenant will go into Broken
/// state.
///
/// `init_done` is an optional channel used during initial load to delay background task
/// start. It is not used later.
#[instrument(skip_all, fields(tenant_id=%tenant_id))]
pub fn spawn_load(
conf: &'static PageServerConf,
tenant_id: TenantId,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
init_done: Option<(completion::Completion, completion::Barrier)>,
init_order: Option<InitializationOrder>,
ctx: &RequestContext,
) -> Arc<Tenant> {
debug_assert_current_span_has_tenant_id();
@@ -937,17 +939,17 @@ impl Tenant {
"initial tenant load",
false,
async move {
// keep the sender alive as long as we have the initial load ongoing; it will be
// None for loads spawned after init_tenant_mgr.
let (_tx, rx) = if let Some((tx, rx)) = init_done {
(Some(tx), Some(rx))
} else {
(None, None)
};
match tenant_clone.load(&ctx).await {
let mut init_order = init_order;
// take the completion because initial tenant loading will complete when all of
// these tasks complete.
let _completion = init_order.as_mut().and_then(|x| x.initial_tenant_load.take());
match tenant_clone.load(init_order.as_ref(), &ctx).await {
Ok(()) => {
debug!("load finished, activating");
tenant_clone.activate(broker_client, rx.as_ref(), &ctx);
let background_jobs_can_start = init_order.as_ref().map(|x| &x.background_jobs_can_start);
tenant_clone.activate(broker_client, background_jobs_can_start, &ctx);
}
Err(err) => {
error!("load failed, setting tenant state to Broken: {err:?}");
@@ -974,7 +976,11 @@ impl Tenant {
/// files on disk. Used at pageserver startup.
///
/// No background tasks are started as part of this routine.
async fn load(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
async fn load(
self: &Arc<Tenant>,
init_order: Option<&InitializationOrder>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
debug!("loading tenant task");
@@ -1094,7 +1100,7 @@ impl Tenant {
// 1. "Timeline has no ancestor and no layer files"
for (timeline_id, local_metadata) in sorted_timelines {
self.load_local_timeline(timeline_id, local_metadata, ctx)
self.load_local_timeline(timeline_id, local_metadata, init_order, ctx)
.await
.with_context(|| format!("load local timeline {timeline_id}"))?;
}
@@ -1112,6 +1118,7 @@ impl Tenant {
&self,
timeline_id: TimelineId,
local_metadata: TimelineMetadata,
init_order: Option<&InitializationOrder>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
@@ -1181,6 +1188,7 @@ impl Tenant {
Some(local_metadata),
ancestor,
false,
init_order,
ctx,
)
.await
@@ -1724,12 +1732,12 @@ impl Tenant {
/// Changes tenant status to active, unless shutdown was already requested.
///
/// `init_done` is an optional channel used during initial load to delay background task
/// start. It is not used later.
/// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup
/// to delay background jobs. Background jobs can be started right away when None is given.
fn activate(
self: &Arc<Self>,
broker_client: BrokerClientChannel,
init_done: Option<&completion::Barrier>,
background_jobs_can_start: Option<&completion::Barrier>,
ctx: &RequestContext,
) {
debug_assert_current_span_has_tenant_id();
@@ -1762,12 +1770,12 @@ impl Tenant {
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
tasks::start_background_loops(self, init_done);
tasks::start_background_loops(self, background_jobs_can_start);
let mut activated_timelines = 0;
for timeline in not_broken_timelines {
timeline.activate(broker_client.clone(), init_done, ctx);
timeline.activate(broker_client.clone(), background_jobs_can_start, ctx);
activated_timelines += 1;
}
@@ -2158,6 +2166,7 @@ impl Tenant {
new_metadata: &TimelineMetadata,
ancestor: Option<Arc<Timeline>>,
remote_client: Option<RemoteTimelineClient>,
init_order: Option<&InitializationOrder>,
) -> anyhow::Result<Arc<Timeline>> {
if let Some(ancestor_timeline_id) = new_metadata.ancestor_timeline() {
anyhow::ensure!(
@@ -2166,6 +2175,9 @@ impl Tenant {
)
}
let initial_logical_size_can_start = init_order.map(|x| &x.initial_logical_size_can_start);
let initial_logical_size_attempt = init_order.map(|x| &x.initial_logical_size_attempt);
let pg_version = new_metadata.pg_version();
Ok(Timeline::new(
self.conf,
@@ -2177,6 +2189,8 @@ impl Tenant {
Arc::clone(&self.walredo_mgr),
remote_client,
pg_version,
initial_logical_size_can_start.cloned(),
initial_logical_size_attempt.cloned(),
))
}
@@ -2852,7 +2866,7 @@ impl Tenant {
remote_client: Option<RemoteTimelineClient>,
) -> anyhow::Result<Arc<Timeline>> {
let timeline_data = self
.create_timeline_data(new_timeline_id, new_metadata, ancestor, remote_client)
.create_timeline_data(new_timeline_id, new_metadata, ancestor, remote_client, None)
.context("Failed to create timeline data structure")?;
crashsafe::create_dir_all(timeline_path).context("Failed to create timeline directory")?;
@@ -3420,7 +3434,7 @@ pub mod harness {
timelines_to_load.insert(timeline_id, timeline_metadata);
}
tenant
.load(ctx)
.load(None, ctx)
.instrument(info_span!("try_load", tenant_id=%self.tenant_id))
.await?;
tenant.state.send_replace(TenantState::Active);

View File

@@ -21,9 +21,8 @@ use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::TenantConfOpt;
use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState};
use crate::IGNORED_TENANT_FILE_NAME;
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME};
use utils::completion;
use utils::fs_ext::PathExt;
use utils::id::{TenantId, TimelineId};
@@ -65,7 +64,7 @@ pub async fn init_tenant_mgr(
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
init_done: (completion::Completion, completion::Barrier),
init_order: InitializationOrder,
) -> anyhow::Result<()> {
// Scan local filesystem for attached tenants
let tenants_dir = conf.tenants_path();
@@ -122,7 +121,7 @@ pub async fn init_tenant_mgr(
&tenant_dir_path,
broker_client.clone(),
remote_storage.clone(),
Some(init_done.clone()),
Some(init_order.clone()),
&ctx,
) {
Ok(tenant) => {
@@ -153,14 +152,12 @@ pub async fn init_tenant_mgr(
Ok(())
}
/// `init_done` is an optional channel used during initial load to delay background task
/// start. It is not used later.
pub fn schedule_local_tenant_processing(
conf: &'static PageServerConf,
tenant_path: &Path,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
init_done: Option<(completion::Completion, completion::Barrier)>,
init_order: Option<InitializationOrder>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
anyhow::ensure!(
@@ -219,7 +216,7 @@ pub fn schedule_local_tenant_processing(
tenant_id,
broker_client,
remote_storage,
init_done,
init_order,
ctx,
)
};

View File

@@ -15,10 +15,10 @@ use tracing::*;
use utils::completion;
/// Start per tenant background loops: compaction and gc.
///
/// `init_done` is an optional channel used during initial load to delay background task
/// start. It is not used later.
pub fn start_background_loops(tenant: &Arc<Tenant>, init_done: Option<&completion::Barrier>) {
pub fn start_background_loops(
tenant: &Arc<Tenant>,
background_jobs_can_start: Option<&completion::Barrier>,
) {
let tenant_id = tenant.tenant_id;
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
@@ -29,10 +29,14 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, init_done: Option<&completio
false,
{
let tenant = Arc::clone(tenant);
let init_done = init_done.cloned();
let background_jobs_can_start = background_jobs_can_start.cloned();
async move {
completion::Barrier::maybe_wait(init_done).await;
compaction_loop(tenant)
let cancel = task_mgr::shutdown_token();
tokio::select! {
_ = cancel.cancelled() => { return Ok(()) },
_ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
};
compaction_loop(tenant, cancel)
.instrument(info_span!("compaction_loop", tenant_id = %tenant_id))
.await;
Ok(())
@@ -48,10 +52,14 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, init_done: Option<&completio
false,
{
let tenant = Arc::clone(tenant);
let init_done = init_done.cloned();
let background_jobs_can_start = background_jobs_can_start.cloned();
async move {
completion::Barrier::maybe_wait(init_done).await;
gc_loop(tenant)
let cancel = task_mgr::shutdown_token();
tokio::select! {
_ = cancel.cancelled() => { return Ok(()) },
_ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
};
gc_loop(tenant, cancel)
.instrument(info_span!("gc_loop", tenant_id = %tenant_id))
.await;
Ok(())
@@ -63,12 +71,11 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, init_done: Option<&completio
///
/// Compaction task's main loop
///
async fn compaction_loop(tenant: Arc<Tenant>) {
async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
let wait_duration = Duration::from_secs(2);
info!("starting");
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
let cancel = task_mgr::shutdown_token();
let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
let mut first = true;
loop {
@@ -133,12 +140,11 @@ async fn compaction_loop(tenant: Arc<Tenant>) {
///
/// GC task's main loop
///
async fn gc_loop(tenant: Arc<Tenant>) {
async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
let wait_duration = Duration::from_secs(2);
info!("starting");
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
let cancel = task_mgr::shutdown_token();
// GC might require downloading, to find the cutoff LSN that corresponds to the
// cutoff specified as time.
let ctx =

View File

@@ -242,6 +242,13 @@ pub struct Timeline {
pub delete_lock: tokio::sync::Mutex<bool>,
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
/// Barrier to wait before doing initial logical size calculation. Used only during startup.
initial_logical_size_can_start: Option<completion::Barrier>,
/// Completion shared between all timelines loaded during startup; used to delay heavier
/// background tasks until some logical sizes have been calculated.
initial_logical_size_attempt: Mutex<Option<completion::Completion>>,
}
/// Internal structure to hold all data needed for logical size calculation.
@@ -932,12 +939,12 @@ impl Timeline {
pub fn activate(
self: &Arc<Self>,
broker_client: BrokerClientChannel,
init_done: Option<&completion::Barrier>,
background_jobs_can_start: Option<&completion::Barrier>,
ctx: &RequestContext,
) {
self.launch_wal_receiver(ctx, broker_client);
self.set_state(TimelineState::Active);
self.launch_eviction_task(init_done);
self.launch_eviction_task(background_jobs_can_start);
}
pub fn set_state(&self, new_state: TimelineState) {
@@ -955,6 +962,14 @@ impl Timeline {
error!("Not activating a Stopping timeline");
}
(_, new_state) => {
if matches!(new_state, TimelineState::Stopping | TimelineState::Broken) {
// drop the copmletion guard, if any; it might be holding off the completion
// forever needlessly
self.initial_logical_size_attempt
.lock()
.unwrap_or_else(|e| e.into_inner())
.take();
}
self.state.send_replace(new_state);
}
}
@@ -1345,6 +1360,8 @@ impl Timeline {
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
remote_client: Option<RemoteTimelineClient>,
pg_version: u32,
initial_logical_size_can_start: Option<completion::Barrier>,
initial_logical_size_attempt: Option<completion::Completion>,
) -> Arc<Self> {
let disk_consistent_lsn = metadata.disk_consistent_lsn();
let (state, _) = watch::channel(TimelineState::Loading);
@@ -1439,6 +1456,9 @@ impl Timeline {
EvictionTaskTimelineState::default(),
),
delete_lock: tokio::sync::Mutex::new(false),
initial_logical_size_can_start,
initial_logical_size_attempt: Mutex::new(initial_logical_size_attempt),
};
result.repartition_threshold = result.get_checkpoint_distance() / 10;
result
@@ -1927,7 +1947,27 @@ impl Timeline {
false,
// NB: don't log errors here, task_mgr will do that.
async move {
// no cancellation here, because nothing really waits for this to complete compared
let cancel = task_mgr::shutdown_token();
// in case we were created during pageserver initialization, wait for
// initialization to complete before proceeding. startup time init runs on the same
// runtime.
tokio::select! {
_ = cancel.cancelled() => { return Ok(()); },
_ = completion::Barrier::maybe_wait(self_clone.initial_logical_size_can_start.clone()) => {}
};
// hold off background tasks from starting until all timelines get to try at least
// once initial logical size calculation; though retry will rarely be useful.
// holding off is done because heavier tasks execute blockingly on the same
// runtime.
//
// dropping this at every outcome is probably better than trying to cling on to it,
// delay will be terminated by a timeout regardless.
let _completion = { self_clone.initial_logical_size_attempt.lock().expect("unexpected initial_logical_size_attempt poisoned").take() };
// no extra cancellation here, because nothing really waits for this to complete compared
// to spawn_ondemand_logical_size_calculation.
let cancel = CancellationToken::new();

View File

@@ -49,9 +49,12 @@ pub struct EvictionTaskTenantState {
}
impl Timeline {
pub(super) fn launch_eviction_task(self: &Arc<Self>, init_done: Option<&completion::Barrier>) {
pub(super) fn launch_eviction_task(
self: &Arc<Self>,
background_tasks_can_start: Option<&completion::Barrier>,
) {
let self_clone = Arc::clone(self);
let init_done = init_done.cloned();
let background_tasks_can_start = background_tasks_can_start.cloned();
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::Eviction,
@@ -60,8 +63,13 @@ impl Timeline {
&format!("layer eviction for {}/{}", self.tenant_id, self.timeline_id),
false,
async move {
completion::Barrier::maybe_wait(init_done).await;
self_clone.eviction_task(task_mgr::shutdown_token()).await;
let cancel = task_mgr::shutdown_token();
tokio::select! {
_ = cancel.cancelled() => { return Ok(()); }
_ = completion::Barrier::maybe_wait(background_tasks_can_start) => {}
};
self_clone.eviction_task(cancel).await;
info!("eviction task finishing");
Ok(())
},

View File

@@ -110,6 +110,12 @@ class EvictionEnv:
overrides=(
"--pageserver-config-override=disk_usage_based_eviction="
+ enc.dump_inline_table(disk_usage_config).replace("\n", " "),
# Disk usage based eviction runs as a background task.
# But pageserver startup delays launch of background tasks for some time, to prioritize initial logical size calculations during startup.
# But, initial logical size calculation may not be triggered if safekeepers don't publish new broker messages.
# But, we only have a 10-second-timeout in this test.
# So, disable the delay for this test.
"--pageserver-config-override=background_task_maximum_delay='0s'",
),
)