Continued startup speedup (#4372)

Startup continues to be slow, work towards to alleviate it.

Summary of changes:

- pretty the functional improvements from #4366 into
`utils::completion::{Completion, Barrier}`
- extend "initial load completion" usage up to tenant background tasks
    - previously only global background tasks
- spawn_blocking the tenant load directory traversal
- demote some logging
- remove some unwraps
- propagate some spans to `spawn_blocking`

Runtime effects should be major speedup to loading, but after that, the
`BACKGROUND_RUNTIME` will be blocked for a long time (minutes). Possible
follow-ups:
- complete initial tenant sizes before allowing background tasks to
block the `BACKGROUND_RUNTIME`
This commit is contained in:
Joonas Koivunen
2023-05-30 16:25:07 +03:00
committed by GitHub
parent 210be6b6ab
commit f4db85de40
9 changed files with 181 additions and 116 deletions

View File

@@ -0,0 +1,33 @@
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
/// While a reference is kept around, the associated [`Barrier::wait`] will wait.
///
/// Can be cloned, moved and kept around in futures as "guard objects".
#[derive(Clone)]
pub struct Completion(mpsc::Sender<()>);
/// Barrier will wait until all clones of [`Completion`] have been dropped.
#[derive(Clone)]
pub struct Barrier(Arc<Mutex<mpsc::Receiver<()>>>);
impl Barrier {
pub async fn wait(self) {
self.0.lock().await.recv().await;
}
pub async fn maybe_wait(barrier: Option<Barrier>) {
if let Some(b) = barrier {
b.wait().await
}
}
}
/// Create new Guard and Barrier pair.
pub fn channel() -> (Completion, Barrier) {
let (tx, rx) = mpsc::channel::<()>(1);
let rx = Mutex::new(rx);
let rx = Arc::new(rx);
(Completion(tx), Barrier(rx))
}

View File

@@ -60,6 +60,9 @@ pub mod tracing_span_assert;
pub mod rate_limit;
/// Simple once-barrier and a guard which keeps barrier awaiting.
pub mod completion;
mod failpoint_macro_helpers {
/// use with fail::cfg("$name", "return(2000)")

View File

@@ -338,8 +338,7 @@ fn start_pageserver(
// All tenant load operations carry this while they are ongoing; it will be dropped once those
// operations finish either successfully or in some other manner. However, the initial load
// will be then done, and we can start the global background tasks.
let (init_done_tx, init_done_rx) = tokio::sync::mpsc::channel::<()>(1);
let init_done_rx = Arc::new(tokio::sync::Mutex::new(init_done_rx));
let (init_done_tx, init_done_rx) = utils::completion::channel();
// Scan the local 'tenants/' directory and start loading the tenants
let init_started_at = std::time::Instant::now();
@@ -347,14 +346,13 @@ fn start_pageserver(
conf,
broker_client.clone(),
remote_storage.clone(),
init_done_tx,
(init_done_tx, init_done_rx.clone()),
))?;
BACKGROUND_RUNTIME.spawn({
let init_done_rx = init_done_rx.clone();
async move {
let init_done = async move { init_done_rx.lock().await.recv().await };
init_done.await;
init_done_rx.wait().await;
let elapsed = init_started_at.elapsed();
@@ -435,8 +433,7 @@ fn start_pageserver(
// this is because we only process active tenants and timelines, and the
// Timeline::get_current_logical_size will spawn the logical size calculation,
// which will not be rate-limited.
let init_done = async move { init_done_rx.lock().await.recv().await };
init_done.await;
init_done_rx.wait().await;
pageserver::consumption_metrics::collect_metrics(
metric_collection_endpoint,

View File

@@ -54,6 +54,7 @@ use serde::{Deserialize, Serialize};
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, warn, Instrument};
use utils::completion;
use utils::serde_percent::Percent;
use crate::{
@@ -82,7 +83,7 @@ pub fn launch_disk_usage_global_eviction_task(
conf: &'static PageServerConf,
storage: GenericRemoteStorage,
state: Arc<State>,
init_done_rx: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<()>>>,
init_done: completion::Barrier,
) -> anyhow::Result<()> {
let Some(task_config) = &conf.disk_usage_based_eviction else {
info!("disk usage based eviction task not configured");
@@ -100,8 +101,7 @@ pub fn launch_disk_usage_global_eviction_task(
false,
async move {
// wait until initial load is complete, because we cannot evict from loading tenants.
let init_done = async move { init_done_rx.lock().await.recv().await };
init_done.await;
init_done.wait().await;
disk_usage_eviction_task(
&state,

View File

@@ -20,6 +20,7 @@ use storage_broker::BrokerClientChannel;
use tokio::sync::watch;
use tokio::task::JoinSet;
use tracing::*;
use utils::completion;
use utils::crashsafe::path_with_suffix_extension;
use std::cmp::min;
@@ -653,7 +654,7 @@ impl Tenant {
match tenant_clone.attach(&ctx).await {
Ok(()) => {
info!("attach finished, activating");
tenant_clone.activate(broker_client, &ctx);
tenant_clone.activate(broker_client, None, &ctx);
}
Err(e) => {
error!("attach failed, setting tenant state to Broken: {:?}", e);
@@ -889,15 +890,17 @@ impl Tenant {
/// If the loading fails for some reason, the Tenant will go into Broken
/// state.
///
#[instrument(skip(conf, remote_storage, ctx), fields(tenant_id=%tenant_id))]
#[instrument(skip_all, fields(tenant_id=%tenant_id))]
pub fn spawn_load(
conf: &'static PageServerConf,
tenant_id: TenantId,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
init_done_tx: Option<tokio::sync::mpsc::Sender<()>>,
init_done: Option<(completion::Completion, completion::Barrier)>,
ctx: &RequestContext,
) -> Arc<Tenant> {
debug_assert_current_span_has_tenant_id();
let tenant_conf = match Self::load_tenant_config(conf, tenant_id) {
Ok(conf) => conf,
Err(e) => {
@@ -931,11 +934,15 @@ impl Tenant {
async move {
// keep the sender alive as long as we have the initial load ongoing; it will be
// None for loads spawned after init_tenant_mgr.
let _init_done_tx = init_done_tx;
let (_tx, rx) = if let Some((tx, rx)) = init_done {
(Some(tx), Some(rx))
} else {
(None, None)
};
match tenant_clone.load(&ctx).await {
Ok(()) => {
info!("load finished, activating");
tenant_clone.activate(broker_client, &ctx);
debug!("load finished, activating");
tenant_clone.activate(broker_client, rx.as_ref(), &ctx);
}
Err(err) => {
error!("load failed, setting tenant state to Broken: {err:?}");
@@ -954,8 +961,6 @@ impl Tenant {
}),
);
info!("spawned load into background");
tenant
}
@@ -967,7 +972,7 @@ impl Tenant {
async fn load(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
info!("loading tenant task");
debug!("loading tenant task");
utils::failpoint_sleep_millis_async!("before-loading-tenant");
@@ -977,102 +982,109 @@ impl Tenant {
//
// Scan the directory, peek into the metadata file of each timeline, and
// collect a list of timelines and their ancestors.
let mut timelines_to_load: HashMap<TimelineId, TimelineMetadata> = HashMap::new();
let timelines_dir = self.conf.timelines_path(&self.tenant_id);
for entry in std::fs::read_dir(&timelines_dir).with_context(|| {
format!(
"Failed to list timelines directory for tenant {}",
self.tenant_id
)
})? {
let entry = entry.with_context(|| {
format!("cannot read timeline dir entry for {}", self.tenant_id)
})?;
let timeline_dir = entry.path();
let tenant_id = self.tenant_id;
let conf = self.conf;
let span = info_span!("blocking");
if crate::is_temporary(&timeline_dir) {
info!(
"Found temporary timeline directory, removing: {}",
timeline_dir.display()
);
if let Err(e) = std::fs::remove_dir_all(&timeline_dir) {
error!(
"Failed to remove temporary directory '{}': {:?}",
timeline_dir.display(),
e
let sorted_timelines: Vec<(_, _)> = tokio::task::spawn_blocking(move || {
let _g = span.entered();
let mut timelines_to_load: HashMap<TimelineId, TimelineMetadata> = HashMap::new();
let timelines_dir = conf.timelines_path(&tenant_id);
for entry in
std::fs::read_dir(&timelines_dir).context("list timelines directory for tenant")?
{
let entry = entry.context("read timeline dir entry")?;
let timeline_dir = entry.path();
if crate::is_temporary(&timeline_dir) {
info!(
"Found temporary timeline directory, removing: {}",
timeline_dir.display()
);
}
} else if is_uninit_mark(&timeline_dir) {
let timeline_uninit_mark_file = &timeline_dir;
info!(
"Found an uninit mark file {}, removing the timeline and its uninit mark",
timeline_uninit_mark_file.display()
);
let timeline_id = timeline_uninit_mark_file
.file_stem()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<TimelineId>()
.with_context(|| {
format!(
if let Err(e) = std::fs::remove_dir_all(&timeline_dir) {
error!(
"Failed to remove temporary directory '{}': {:?}",
timeline_dir.display(),
e
);
}
} else if is_uninit_mark(&timeline_dir) {
let timeline_uninit_mark_file = &timeline_dir;
info!(
"Found an uninit mark file {}, removing the timeline and its uninit mark",
timeline_uninit_mark_file.display()
);
let timeline_id = timeline_uninit_mark_file
.file_stem()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<TimelineId>()
.with_context(|| {
format!(
"Could not parse timeline id out of the timeline uninit mark name {}",
timeline_uninit_mark_file.display()
)
})?;
let timeline_dir = self.conf.timeline_path(&timeline_id, &self.tenant_id);
if let Err(e) =
remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file)
{
error!("Failed to clean up uninit marked timeline: {e:?}");
}
} else {
let timeline_id = timeline_dir
.file_name()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<TimelineId>()
.with_context(|| {
format!(
"Could not parse timeline id out of the timeline dir name {}",
timeline_dir.display()
)
})?;
let timeline_uninit_mark_file = self
.conf
.timeline_uninit_mark_file_path(self.tenant_id, timeline_id);
if timeline_uninit_mark_file.exists() {
info!(
"Found an uninit mark file for timeline {}/{}, removing the timeline and its uninit mark",
self.tenant_id, timeline_id
);
})?;
let timeline_dir = conf.timeline_path(&timeline_id, &tenant_id);
if let Err(e) =
remove_timeline_and_uninit_mark(&timeline_dir, &timeline_uninit_mark_file)
remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file)
{
error!("Failed to clean up uninit marked timeline: {e:?}");
}
continue;
}
let file_name = entry.file_name();
if let Ok(timeline_id) =
file_name.to_str().unwrap_or_default().parse::<TimelineId>()
{
let metadata = load_metadata(self.conf, timeline_id, self.tenant_id)
.context("failed to load metadata")?;
timelines_to_load.insert(timeline_id, metadata);
} else {
// A file or directory that doesn't look like a timeline ID
warn!(
"unexpected file or directory in timelines directory: {}",
file_name.to_string_lossy()
);
let timeline_id = timeline_dir
.file_name()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<TimelineId>()
.with_context(|| {
format!(
"Could not parse timeline id out of the timeline dir name {}",
timeline_dir.display()
)
})?;
let timeline_uninit_mark_file =
conf.timeline_uninit_mark_file_path(tenant_id, timeline_id);
if timeline_uninit_mark_file.exists() {
info!(
%timeline_id,
"Found an uninit mark file, removing the timeline and its uninit mark",
);
if let Err(e) = remove_timeline_and_uninit_mark(
&timeline_dir,
&timeline_uninit_mark_file,
) {
error!("Failed to clean up uninit marked timeline: {e:?}");
}
continue;
}
let file_name = entry.file_name();
if let Ok(timeline_id) =
file_name.to_str().unwrap_or_default().parse::<TimelineId>()
{
let metadata = load_metadata(conf, timeline_id, tenant_id)
.context("failed to load metadata")?;
timelines_to_load.insert(timeline_id, metadata);
} else {
// A file or directory that doesn't look like a timeline ID
warn!(
"unexpected file or directory in timelines directory: {}",
file_name.to_string_lossy()
);
}
}
}
}
// Sort the array of timeline IDs into tree-order, so that parent comes before
// all its children.
let sorted_timelines = tree_sort_timelines(timelines_to_load)?;
// Sort the array of timeline IDs into tree-order, so that parent comes before
// all its children.
tree_sort_timelines(timelines_to_load)
})
.await
.context("load spawn_blocking")
.and_then(|res| res)?;
// FIXME original collect_timeline_files contained one more check:
// 1. "Timeline has no ancestor and no layer files"
@@ -1082,7 +1094,7 @@ impl Tenant {
.with_context(|| format!("load local timeline {timeline_id}"))?;
}
info!("Done");
trace!("Done");
Ok(())
}
@@ -1670,7 +1682,12 @@ impl Tenant {
}
/// Changes tenant status to active, unless shutdown was already requested.
fn activate(self: &Arc<Self>, broker_client: BrokerClientChannel, ctx: &RequestContext) {
fn activate(
self: &Arc<Self>,
broker_client: BrokerClientChannel,
init_done: Option<&completion::Barrier>,
ctx: &RequestContext,
) {
debug_assert_current_span_has_tenant_id();
let mut activating = false;
@@ -1701,7 +1718,7 @@ impl Tenant {
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
tasks::start_background_loops(self);
tasks::start_background_loops(self, init_done);
let mut activated_timelines = 0;

View File

@@ -25,6 +25,7 @@ use crate::tenant::{
};
use crate::IGNORED_TENANT_FILE_NAME;
use utils::completion;
use utils::fs_ext::PathExt;
use utils::id::{TenantId, TimelineId};
@@ -66,7 +67,7 @@ pub async fn init_tenant_mgr(
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
init_done_tx: tokio::sync::mpsc::Sender<()>,
init_done: (completion::Completion, completion::Barrier),
) -> anyhow::Result<()> {
// Scan local filesystem for attached tenants
let tenants_dir = conf.tenants_path();
@@ -123,7 +124,7 @@ pub async fn init_tenant_mgr(
&tenant_dir_path,
broker_client.clone(),
remote_storage.clone(),
Some(init_done_tx.clone()),
Some(init_done.clone()),
&ctx,
) {
Ok(tenant) => {
@@ -159,7 +160,7 @@ pub fn schedule_local_tenant_processing(
tenant_path: &Path,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
init_done_tx: Option<tokio::sync::mpsc::Sender<()>>,
init_done: Option<(completion::Completion, completion::Barrier)>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
anyhow::ensure!(
@@ -218,7 +219,7 @@ pub fn schedule_local_tenant_processing(
tenant_id,
broker_client,
remote_storage,
init_done_tx,
init_done,
ctx,
)
};

View File

@@ -12,8 +12,9 @@ use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::{Tenant, TenantState};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::completion;
pub fn start_background_loops(tenant: &Arc<Tenant>) {
pub fn start_background_loops(tenant: &Arc<Tenant>, init_done: Option<&completion::Barrier>) {
let tenant_id = tenant.tenant_id;
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
@@ -24,7 +25,9 @@ pub fn start_background_loops(tenant: &Arc<Tenant>) {
false,
{
let tenant = Arc::clone(tenant);
let init_done = init_done.cloned();
async move {
completion::Barrier::maybe_wait(init_done).await;
compaction_loop(tenant)
.instrument(info_span!("compaction_loop", tenant_id = %tenant_id))
.await;
@@ -41,7 +44,9 @@ pub fn start_background_loops(tenant: &Arc<Tenant>) {
false,
{
let tenant = Arc::clone(tenant);
let init_done = init_done.cloned();
async move {
completion::Barrier::maybe_wait(init_done).await;
gc_loop(tenant)
.instrument(info_span!("gc_loop", tenant_id = %tenant_id))
.await;

View File

@@ -2728,7 +2728,7 @@ impl Timeline {
}
/// Flush one frozen in-memory layer to disk, as a new delta layer.
#[instrument(skip(self, frozen_layer, ctx), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer.short_id()))]
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer.short_id()))]
async fn flush_frozen_layer(
self: &Arc<Self>,
frozen_layer: Arc<InMemoryLayer>,
@@ -2752,9 +2752,14 @@ impl Timeline {
// normal case, write out a L0 delta layer file.
let this = self.clone();
let frozen_layer = frozen_layer.clone();
let (delta_path, metadata) =
tokio::task::spawn_blocking(move || this.create_delta_layer(&frozen_layer))
.await??;
let span = tracing::info_span!("blocking");
let (delta_path, metadata) = tokio::task::spawn_blocking(move || {
let _g = span.entered();
this.create_delta_layer(&frozen_layer)
})
.await
.context("create_delta_layer spawn_blocking")
.and_then(|res| res)?;
HashMap::from([(delta_path, metadata)])
};
@@ -3523,14 +3528,18 @@ impl Timeline {
let this = self.clone();
let ctx_inner = ctx.clone();
let layer_removal_cs_inner = layer_removal_cs.clone();
let span = tracing::info_span!("blocking");
let CompactLevel0Phase1Result {
new_layers,
deltas_to_compact,
} = tokio::task::spawn_blocking(move || {
let _g = span.entered();
this.compact_level0_phase1(layer_removal_cs_inner, target_file_size, &ctx_inner)
})
.await
.unwrap()?;
.context("compact_level0_phase1 spawn_blocking")
.map_err(CompactionError::Other)
.and_then(|res| res)?;
if new_layers.is_empty() && deltas_to_compact.is_empty() {
// nothing to do

View File

@@ -309,7 +309,7 @@ def test_pageserver_with_empty_tenants(
env.pageserver.allowed_errors.append(
".*marking .* as locally complete, while it doesnt exist in remote index.*"
)
env.pageserver.allowed_errors.append(".*load failed.*Failed to list timelines directory.*")
env.pageserver.allowed_errors.append(".*load failed.*list timelines directory.*")
client = env.pageserver.http_client()