From f4db85de404f2bba8d8cede7440a1fa654d53ce6 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Tue, 30 May 2023 16:25:07 +0300 Subject: [PATCH] Continued startup speedup (#4372) Startup continues to be slow, work towards to alleviate it. Summary of changes: - pretty the functional improvements from #4366 into `utils::completion::{Completion, Barrier}` - extend "initial load completion" usage up to tenant background tasks - previously only global background tasks - spawn_blocking the tenant load directory traversal - demote some logging - remove some unwraps - propagate some spans to `spawn_blocking` Runtime effects should be major speedup to loading, but after that, the `BACKGROUND_RUNTIME` will be blocked for a long time (minutes). Possible follow-ups: - complete initial tenant sizes before allowing background tasks to block the `BACKGROUND_RUNTIME` --- libs/utils/src/completion.rs | 33 ++++ libs/utils/src/lib.rs | 3 + pageserver/src/bin/pageserver.rs | 11 +- pageserver/src/disk_usage_eviction_task.rs | 6 +- pageserver/src/tenant.rs | 207 +++++++++++---------- pageserver/src/tenant/mgr.rs | 9 +- pageserver/src/tenant/tasks.rs | 7 +- pageserver/src/tenant/timeline.rs | 19 +- test_runner/regress/test_tenants.py | 2 +- 9 files changed, 181 insertions(+), 116 deletions(-) create mode 100644 libs/utils/src/completion.rs diff --git a/libs/utils/src/completion.rs b/libs/utils/src/completion.rs new file mode 100644 index 0000000000..2cdaee548e --- /dev/null +++ b/libs/utils/src/completion.rs @@ -0,0 +1,33 @@ +use std::sync::Arc; + +use tokio::sync::{mpsc, Mutex}; + +/// While a reference is kept around, the associated [`Barrier::wait`] will wait. +/// +/// Can be cloned, moved and kept around in futures as "guard objects". +#[derive(Clone)] +pub struct Completion(mpsc::Sender<()>); + +/// Barrier will wait until all clones of [`Completion`] have been dropped. +#[derive(Clone)] +pub struct Barrier(Arc>>); + +impl Barrier { + pub async fn wait(self) { + self.0.lock().await.recv().await; + } + + pub async fn maybe_wait(barrier: Option) { + if let Some(b) = barrier { + b.wait().await + } + } +} + +/// Create new Guard and Barrier pair. +pub fn channel() -> (Completion, Barrier) { + let (tx, rx) = mpsc::channel::<()>(1); + let rx = Mutex::new(rx); + let rx = Arc::new(rx); + (Completion(tx), Barrier(rx)) +} diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 4e4f79ab6b..69d3a1b9f2 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -60,6 +60,9 @@ pub mod tracing_span_assert; pub mod rate_limit; +/// Simple once-barrier and a guard which keeps barrier awaiting. +pub mod completion; + mod failpoint_macro_helpers { /// use with fail::cfg("$name", "return(2000)") diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index cbc97e7228..a2cebffc83 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -338,8 +338,7 @@ fn start_pageserver( // All tenant load operations carry this while they are ongoing; it will be dropped once those // operations finish either successfully or in some other manner. However, the initial load // will be then done, and we can start the global background tasks. - let (init_done_tx, init_done_rx) = tokio::sync::mpsc::channel::<()>(1); - let init_done_rx = Arc::new(tokio::sync::Mutex::new(init_done_rx)); + let (init_done_tx, init_done_rx) = utils::completion::channel(); // Scan the local 'tenants/' directory and start loading the tenants let init_started_at = std::time::Instant::now(); @@ -347,14 +346,13 @@ fn start_pageserver( conf, broker_client.clone(), remote_storage.clone(), - init_done_tx, + (init_done_tx, init_done_rx.clone()), ))?; BACKGROUND_RUNTIME.spawn({ let init_done_rx = init_done_rx.clone(); async move { - let init_done = async move { init_done_rx.lock().await.recv().await }; - init_done.await; + init_done_rx.wait().await; let elapsed = init_started_at.elapsed(); @@ -435,8 +433,7 @@ fn start_pageserver( // this is because we only process active tenants and timelines, and the // Timeline::get_current_logical_size will spawn the logical size calculation, // which will not be rate-limited. - let init_done = async move { init_done_rx.lock().await.recv().await }; - init_done.await; + init_done_rx.wait().await; pageserver::consumption_metrics::collect_metrics( metric_collection_endpoint, diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index 0358969199..1a8886935c 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -54,6 +54,7 @@ use serde::{Deserialize, Serialize}; use tokio::time::Instant; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, instrument, warn, Instrument}; +use utils::completion; use utils::serde_percent::Percent; use crate::{ @@ -82,7 +83,7 @@ pub fn launch_disk_usage_global_eviction_task( conf: &'static PageServerConf, storage: GenericRemoteStorage, state: Arc, - init_done_rx: Arc>>, + init_done: completion::Barrier, ) -> anyhow::Result<()> { let Some(task_config) = &conf.disk_usage_based_eviction else { info!("disk usage based eviction task not configured"); @@ -100,8 +101,7 @@ pub fn launch_disk_usage_global_eviction_task( false, async move { // wait until initial load is complete, because we cannot evict from loading tenants. - let init_done = async move { init_done_rx.lock().await.recv().await }; - init_done.await; + init_done.wait().await; disk_usage_eviction_task( &state, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index d6eb824107..ff975db601 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -20,6 +20,7 @@ use storage_broker::BrokerClientChannel; use tokio::sync::watch; use tokio::task::JoinSet; use tracing::*; +use utils::completion; use utils::crashsafe::path_with_suffix_extension; use std::cmp::min; @@ -653,7 +654,7 @@ impl Tenant { match tenant_clone.attach(&ctx).await { Ok(()) => { info!("attach finished, activating"); - tenant_clone.activate(broker_client, &ctx); + tenant_clone.activate(broker_client, None, &ctx); } Err(e) => { error!("attach failed, setting tenant state to Broken: {:?}", e); @@ -889,15 +890,17 @@ impl Tenant { /// If the loading fails for some reason, the Tenant will go into Broken /// state. /// - #[instrument(skip(conf, remote_storage, ctx), fields(tenant_id=%tenant_id))] + #[instrument(skip_all, fields(tenant_id=%tenant_id))] pub fn spawn_load( conf: &'static PageServerConf, tenant_id: TenantId, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, - init_done_tx: Option>, + init_done: Option<(completion::Completion, completion::Barrier)>, ctx: &RequestContext, ) -> Arc { + debug_assert_current_span_has_tenant_id(); + let tenant_conf = match Self::load_tenant_config(conf, tenant_id) { Ok(conf) => conf, Err(e) => { @@ -931,11 +934,15 @@ impl Tenant { async move { // keep the sender alive as long as we have the initial load ongoing; it will be // None for loads spawned after init_tenant_mgr. - let _init_done_tx = init_done_tx; + let (_tx, rx) = if let Some((tx, rx)) = init_done { + (Some(tx), Some(rx)) + } else { + (None, None) + }; match tenant_clone.load(&ctx).await { Ok(()) => { - info!("load finished, activating"); - tenant_clone.activate(broker_client, &ctx); + debug!("load finished, activating"); + tenant_clone.activate(broker_client, rx.as_ref(), &ctx); } Err(err) => { error!("load failed, setting tenant state to Broken: {err:?}"); @@ -954,8 +961,6 @@ impl Tenant { }), ); - info!("spawned load into background"); - tenant } @@ -967,7 +972,7 @@ impl Tenant { async fn load(self: &Arc, ctx: &RequestContext) -> anyhow::Result<()> { debug_assert_current_span_has_tenant_id(); - info!("loading tenant task"); + debug!("loading tenant task"); utils::failpoint_sleep_millis_async!("before-loading-tenant"); @@ -977,102 +982,109 @@ impl Tenant { // // Scan the directory, peek into the metadata file of each timeline, and // collect a list of timelines and their ancestors. - let mut timelines_to_load: HashMap = HashMap::new(); - let timelines_dir = self.conf.timelines_path(&self.tenant_id); - for entry in std::fs::read_dir(&timelines_dir).with_context(|| { - format!( - "Failed to list timelines directory for tenant {}", - self.tenant_id - ) - })? { - let entry = entry.with_context(|| { - format!("cannot read timeline dir entry for {}", self.tenant_id) - })?; - let timeline_dir = entry.path(); + let tenant_id = self.tenant_id; + let conf = self.conf; + let span = info_span!("blocking"); - if crate::is_temporary(&timeline_dir) { - info!( - "Found temporary timeline directory, removing: {}", - timeline_dir.display() - ); - if let Err(e) = std::fs::remove_dir_all(&timeline_dir) { - error!( - "Failed to remove temporary directory '{}': {:?}", - timeline_dir.display(), - e + let sorted_timelines: Vec<(_, _)> = tokio::task::spawn_blocking(move || { + let _g = span.entered(); + let mut timelines_to_load: HashMap = HashMap::new(); + let timelines_dir = conf.timelines_path(&tenant_id); + + for entry in + std::fs::read_dir(&timelines_dir).context("list timelines directory for tenant")? + { + let entry = entry.context("read timeline dir entry")?; + let timeline_dir = entry.path(); + + if crate::is_temporary(&timeline_dir) { + info!( + "Found temporary timeline directory, removing: {}", + timeline_dir.display() ); - } - } else if is_uninit_mark(&timeline_dir) { - let timeline_uninit_mark_file = &timeline_dir; - info!( - "Found an uninit mark file {}, removing the timeline and its uninit mark", - timeline_uninit_mark_file.display() - ); - let timeline_id = timeline_uninit_mark_file - .file_stem() - .and_then(OsStr::to_str) - .unwrap_or_default() - .parse::() - .with_context(|| { - format!( + if let Err(e) = std::fs::remove_dir_all(&timeline_dir) { + error!( + "Failed to remove temporary directory '{}': {:?}", + timeline_dir.display(), + e + ); + } + } else if is_uninit_mark(&timeline_dir) { + let timeline_uninit_mark_file = &timeline_dir; + info!( + "Found an uninit mark file {}, removing the timeline and its uninit mark", + timeline_uninit_mark_file.display() + ); + let timeline_id = timeline_uninit_mark_file + .file_stem() + .and_then(OsStr::to_str) + .unwrap_or_default() + .parse::() + .with_context(|| { + format!( "Could not parse timeline id out of the timeline uninit mark name {}", timeline_uninit_mark_file.display() ) - })?; - let timeline_dir = self.conf.timeline_path(&timeline_id, &self.tenant_id); - if let Err(e) = - remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file) - { - error!("Failed to clean up uninit marked timeline: {e:?}"); - } - } else { - let timeline_id = timeline_dir - .file_name() - .and_then(OsStr::to_str) - .unwrap_or_default() - .parse::() - .with_context(|| { - format!( - "Could not parse timeline id out of the timeline dir name {}", - timeline_dir.display() - ) - })?; - let timeline_uninit_mark_file = self - .conf - .timeline_uninit_mark_file_path(self.tenant_id, timeline_id); - if timeline_uninit_mark_file.exists() { - info!( - "Found an uninit mark file for timeline {}/{}, removing the timeline and its uninit mark", - self.tenant_id, timeline_id - ); + })?; + let timeline_dir = conf.timeline_path(&timeline_id, &tenant_id); if let Err(e) = - remove_timeline_and_uninit_mark(&timeline_dir, &timeline_uninit_mark_file) + remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file) { error!("Failed to clean up uninit marked timeline: {e:?}"); } - continue; - } - - let file_name = entry.file_name(); - if let Ok(timeline_id) = - file_name.to_str().unwrap_or_default().parse::() - { - let metadata = load_metadata(self.conf, timeline_id, self.tenant_id) - .context("failed to load metadata")?; - timelines_to_load.insert(timeline_id, metadata); } else { - // A file or directory that doesn't look like a timeline ID - warn!( - "unexpected file or directory in timelines directory: {}", - file_name.to_string_lossy() - ); + let timeline_id = timeline_dir + .file_name() + .and_then(OsStr::to_str) + .unwrap_or_default() + .parse::() + .with_context(|| { + format!( + "Could not parse timeline id out of the timeline dir name {}", + timeline_dir.display() + ) + })?; + let timeline_uninit_mark_file = + conf.timeline_uninit_mark_file_path(tenant_id, timeline_id); + if timeline_uninit_mark_file.exists() { + info!( + %timeline_id, + "Found an uninit mark file, removing the timeline and its uninit mark", + ); + if let Err(e) = remove_timeline_and_uninit_mark( + &timeline_dir, + &timeline_uninit_mark_file, + ) { + error!("Failed to clean up uninit marked timeline: {e:?}"); + } + continue; + } + + let file_name = entry.file_name(); + if let Ok(timeline_id) = + file_name.to_str().unwrap_or_default().parse::() + { + let metadata = load_metadata(conf, timeline_id, tenant_id) + .context("failed to load metadata")?; + timelines_to_load.insert(timeline_id, metadata); + } else { + // A file or directory that doesn't look like a timeline ID + warn!( + "unexpected file or directory in timelines directory: {}", + file_name.to_string_lossy() + ); + } } } - } - // Sort the array of timeline IDs into tree-order, so that parent comes before - // all its children. - let sorted_timelines = tree_sort_timelines(timelines_to_load)?; + // Sort the array of timeline IDs into tree-order, so that parent comes before + // all its children. + tree_sort_timelines(timelines_to_load) + }) + .await + .context("load spawn_blocking") + .and_then(|res| res)?; + // FIXME original collect_timeline_files contained one more check: // 1. "Timeline has no ancestor and no layer files" @@ -1082,7 +1094,7 @@ impl Tenant { .with_context(|| format!("load local timeline {timeline_id}"))?; } - info!("Done"); + trace!("Done"); Ok(()) } @@ -1670,7 +1682,12 @@ impl Tenant { } /// Changes tenant status to active, unless shutdown was already requested. - fn activate(self: &Arc, broker_client: BrokerClientChannel, ctx: &RequestContext) { + fn activate( + self: &Arc, + broker_client: BrokerClientChannel, + init_done: Option<&completion::Barrier>, + ctx: &RequestContext, + ) { debug_assert_current_span_has_tenant_id(); let mut activating = false; @@ -1701,7 +1718,7 @@ impl Tenant { // Spawn gc and compaction loops. The loops will shut themselves // down when they notice that the tenant is inactive. - tasks::start_background_loops(self); + tasks::start_background_loops(self, init_done); let mut activated_timelines = 0; diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index d74a025bbb..d3cd914037 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -25,6 +25,7 @@ use crate::tenant::{ }; use crate::IGNORED_TENANT_FILE_NAME; +use utils::completion; use utils::fs_ext::PathExt; use utils::id::{TenantId, TimelineId}; @@ -66,7 +67,7 @@ pub async fn init_tenant_mgr( conf: &'static PageServerConf, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, - init_done_tx: tokio::sync::mpsc::Sender<()>, + init_done: (completion::Completion, completion::Barrier), ) -> anyhow::Result<()> { // Scan local filesystem for attached tenants let tenants_dir = conf.tenants_path(); @@ -123,7 +124,7 @@ pub async fn init_tenant_mgr( &tenant_dir_path, broker_client.clone(), remote_storage.clone(), - Some(init_done_tx.clone()), + Some(init_done.clone()), &ctx, ) { Ok(tenant) => { @@ -159,7 +160,7 @@ pub fn schedule_local_tenant_processing( tenant_path: &Path, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, - init_done_tx: Option>, + init_done: Option<(completion::Completion, completion::Barrier)>, ctx: &RequestContext, ) -> anyhow::Result> { anyhow::ensure!( @@ -218,7 +219,7 @@ pub fn schedule_local_tenant_processing( tenant_id, broker_client, remote_storage, - init_done_tx, + init_done, ctx, ) }; diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index b3c8a4a3bb..02aed11114 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -12,8 +12,9 @@ use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME}; use crate::tenant::{Tenant, TenantState}; use tokio_util::sync::CancellationToken; use tracing::*; +use utils::completion; -pub fn start_background_loops(tenant: &Arc) { +pub fn start_background_loops(tenant: &Arc, init_done: Option<&completion::Barrier>) { let tenant_id = tenant.tenant_id; task_mgr::spawn( BACKGROUND_RUNTIME.handle(), @@ -24,7 +25,9 @@ pub fn start_background_loops(tenant: &Arc) { false, { let tenant = Arc::clone(tenant); + let init_done = init_done.cloned(); async move { + completion::Barrier::maybe_wait(init_done).await; compaction_loop(tenant) .instrument(info_span!("compaction_loop", tenant_id = %tenant_id)) .await; @@ -41,7 +44,9 @@ pub fn start_background_loops(tenant: &Arc) { false, { let tenant = Arc::clone(tenant); + let init_done = init_done.cloned(); async move { + completion::Barrier::maybe_wait(init_done).await; gc_loop(tenant) .instrument(info_span!("gc_loop", tenant_id = %tenant_id)) .await; diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 5c889e804c..ee7b002450 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2728,7 +2728,7 @@ impl Timeline { } /// Flush one frozen in-memory layer to disk, as a new delta layer. - #[instrument(skip(self, frozen_layer, ctx), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer.short_id()))] + #[instrument(skip_all, fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer.short_id()))] async fn flush_frozen_layer( self: &Arc, frozen_layer: Arc, @@ -2752,9 +2752,14 @@ impl Timeline { // normal case, write out a L0 delta layer file. let this = self.clone(); let frozen_layer = frozen_layer.clone(); - let (delta_path, metadata) = - tokio::task::spawn_blocking(move || this.create_delta_layer(&frozen_layer)) - .await??; + let span = tracing::info_span!("blocking"); + let (delta_path, metadata) = tokio::task::spawn_blocking(move || { + let _g = span.entered(); + this.create_delta_layer(&frozen_layer) + }) + .await + .context("create_delta_layer spawn_blocking") + .and_then(|res| res)?; HashMap::from([(delta_path, metadata)]) }; @@ -3523,14 +3528,18 @@ impl Timeline { let this = self.clone(); let ctx_inner = ctx.clone(); let layer_removal_cs_inner = layer_removal_cs.clone(); + let span = tracing::info_span!("blocking"); let CompactLevel0Phase1Result { new_layers, deltas_to_compact, } = tokio::task::spawn_blocking(move || { + let _g = span.entered(); this.compact_level0_phase1(layer_removal_cs_inner, target_file_size, &ctx_inner) }) .await - .unwrap()?; + .context("compact_level0_phase1 spawn_blocking") + .map_err(CompactionError::Other) + .and_then(|res| res)?; if new_layers.is_empty() && deltas_to_compact.is_empty() { // nothing to do diff --git a/test_runner/regress/test_tenants.py b/test_runner/regress/test_tenants.py index 59b7b574cd..15712b9e55 100644 --- a/test_runner/regress/test_tenants.py +++ b/test_runner/regress/test_tenants.py @@ -309,7 +309,7 @@ def test_pageserver_with_empty_tenants( env.pageserver.allowed_errors.append( ".*marking .* as locally complete, while it doesnt exist in remote index.*" ) - env.pageserver.allowed_errors.append(".*load failed.*Failed to list timelines directory.*") + env.pageserver.allowed_errors.append(".*load failed.*list timelines directory.*") client = env.pageserver.http_client()