compute_ctl: Perform more startup actions in parallel (#11008)

To speed up compute startup. Resizing swap in particular takes about 100
ms on my laptop. By performing it in parallel with downloading the
basebackup, that latency is effectively hidden. I would imagine that
downloading remote extensions can also take a non-trivial amount of
time, although I didn't try to measure that. In any case that's now also
performed in parallel with downloading the basebackup.
This commit is contained in:
Heikki Linnakangas
2025-03-03 02:29:37 +02:00
committed by GitHub
parent 066324d6ec
commit 38ddfab643

View File

@@ -23,7 +23,7 @@ use postgres::NoTls;
use postgres::error::SqlState;
use remote_storage::{DownloadError, RemotePath};
use tokio::spawn;
use tracing::{debug, error, info, instrument, warn};
use tracing::{Instrument, debug, error, info, instrument, warn};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::measured_stream::MeasuredReader;
@@ -522,9 +522,13 @@ impl ComputeNode {
/// - status is left in ComputeStatus::Init. The caller is responsible for setting it to Failed
/// - if Postgres was started before the fatal error happened, self.running_postgres is
/// set. The caller is responsible for killing it.
///
/// Note that this is in the critical path of a compute cold start. Keep this fast.
/// Try to do things concurrently, to hide the latencies.
fn start_compute(self: &Arc<Self>, pg_handle: &mut Option<PostgresHandle>) -> Result<()> {
let compute_state: ComputeState;
let start_compute_span;
let _this_entered;
{
let mut state_guard = self.state.lock().unwrap();
@@ -536,7 +540,7 @@ impl ComputeNode {
// startup to be considered part of the /configure request.
//
// Similarly, if a trace ID was passed in env variables, attach it to the span.
_this_entered = {
start_compute_span = {
// Temporarily enter the parent span, so that the new span becomes its child.
if let Some(p) = state_guard.startup_span.take() {
let _parent_entered = p.entered();
@@ -549,8 +553,8 @@ impl ComputeNode {
} else {
tracing::info_span!("start_compute")
}
}
.entered();
};
_this_entered = start_compute_span.enter();
state_guard.set_status(ComputeStatus::Init, &self.state_changed);
compute_state = state_guard.clone()
@@ -567,23 +571,44 @@ impl ComputeNode {
pspec.spec.remote_extensions,
);
// Launch remaining service threads
let _monitor_handle = launch_monitor(self);
let _configurator_handle = launch_configurator(self);
////// PRE-STARTUP PHASE: things that need to be finished before we start the Postgres process
// Collect all the tasks that must finish here
let mut pre_tasks = tokio::task::JoinSet::new();
// If there are any remote extensions in shared_preload_libraries, start downloading them
if pspec.spec.remote_extensions.is_some() {
let (this, spec) = (self.clone(), pspec.spec.clone());
pre_tasks.spawn(async move {
this.download_preload_extensions(&spec)
.in_current_span()
.await
});
}
// Prepare pgdata directory. This downloads the basebackup, among other things.
{
let (this, cs) = (self.clone(), compute_state.clone());
pre_tasks.spawn_blocking_child(move || this.prepare_pgdata(&cs));
}
// Resize swap to the desired size if the compute spec says so
if let (Some(size_bytes), true) =
(pspec.spec.swap_size_bytes, self.params.resize_swap_on_bind)
{
// To avoid 'swapoff' hitting postgres startup, we need to run resize-swap to completion
// *before* starting postgres.
//
// In theory, we could do this asynchronously if SkipSwapon was enabled for VMs, but this
// carries a risk of introducing hard-to-debug issues - e.g. if postgres sometimes gets
// OOM-killed during startup because swap wasn't available yet.
resize_swap(size_bytes).context("failed to resize swap")?;
let size_mib = size_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
info!(%size_bytes, %size_mib, "resized swap");
pre_tasks.spawn_blocking_child(move || {
// To avoid 'swapoff' hitting postgres startup, we need to run resize-swap to completion
// *before* starting postgres.
//
// In theory, we could do this asynchronously if SkipSwapon was enabled for VMs, but this
// carries a risk of introducing hard-to-debug issues - e.g. if postgres sometimes gets
// OOM-killed during startup because swap wasn't available yet.
resize_swap(size_bytes).context("failed to resize swap")?;
let size_mib = size_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
info!(%size_bytes, %size_mib, "resized swap");
Ok::<(), anyhow::Error>(())
});
}
// Set disk quota if the compute spec says so
@@ -591,10 +616,15 @@ impl ComputeNode {
pspec.spec.disk_quota_bytes,
self.params.set_disk_quota_for_fs.as_ref(),
) {
set_disk_quota(disk_quota_bytes, disk_quota_fs_mountpoint)
.context("failed to set disk quota")?;
let size_mib = disk_quota_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
info!(%disk_quota_bytes, %size_mib, "set disk quota");
let disk_quota_fs_mountpoint = disk_quota_fs_mountpoint.clone();
pre_tasks.spawn_blocking_child(move || {
set_disk_quota(disk_quota_bytes, &disk_quota_fs_mountpoint)
.context("failed to set disk quota")?;
let size_mib = disk_quota_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
info!(%disk_quota_bytes, %size_mib, "set disk quota");
Ok::<(), anyhow::Error>(())
});
}
// tune pgbouncer
@@ -628,37 +658,17 @@ impl ComputeNode {
});
}
// This part is sync, because we need to download
// remote shared_preload_libraries before postgres start (if any)
if let Some(remote_extensions) = &pspec.spec.remote_extensions {
// First, create control files for all availale extensions
extension_server::create_control_files(remote_extensions, &self.params.pgbin);
// Launch remaining service threads
let _monitor_handle = launch_monitor(self);
let _configurator_handle = launch_configurator(self);
let library_load_start_time = Utc::now();
let rt = tokio::runtime::Handle::current();
let remote_ext_metrics = rt.block_on(self.prepare_preload_libraries(&pspec.spec))?;
let library_load_time = Utc::now()
.signed_duration_since(library_load_start_time)
.to_std()
.unwrap()
.as_millis() as u64;
let mut state = self.state.lock().unwrap();
state.metrics.load_ext_ms = library_load_time;
state.metrics.num_ext_downloaded = remote_ext_metrics.num_ext_downloaded;
state.metrics.largest_ext_size = remote_ext_metrics.largest_ext_size;
state.metrics.total_ext_download_size = remote_ext_metrics.total_ext_download_size;
info!(
"Loading shared_preload_libraries took {:?}ms",
library_load_time
);
info!("{:?}", remote_ext_metrics);
// Wait for all the pre-tasks to finish before starting postgres
let rt = tokio::runtime::Handle::current();
while let Some(res) = rt.block_on(pre_tasks.join_next()) {
res??;
}
// Prepre pgdata directory. This downloads the basebackup, among other things.
self.prepare_pgdata(&compute_state)?;
// Start Postgres
////// START POSTGRES
let start_time = Utc::now();
let pg_process = self.start_postgres(pspec.storage_auth_token.clone())?;
let postmaster_pid = pg_process.pid();
@@ -669,6 +679,7 @@ impl ComputeNode {
let config_time = Utc::now();
if pspec.spec.mode == ComputeMode::Primary {
self.configure_as_primary(&compute_state)?;
let conf = self.get_conn_conf(None);
tokio::task::spawn_blocking(|| {
let res = get_installed_extensions(conf);
@@ -714,6 +725,39 @@ impl ComputeNode {
Ok(())
}
#[instrument(skip_all)]
async fn download_preload_extensions(&self, spec: &ComputeSpec) -> Result<()> {
let remote_extensions = if let Some(remote_extensions) = &spec.remote_extensions {
remote_extensions
} else {
return Ok(());
};
// First, create control files for all available extensions
extension_server::create_control_files(remote_extensions, &self.params.pgbin);
let library_load_start_time = Utc::now();
let remote_ext_metrics = self.prepare_preload_libraries(spec).await?;
let library_load_time = Utc::now()
.signed_duration_since(library_load_start_time)
.to_std()
.unwrap()
.as_millis() as u64;
let mut state = self.state.lock().unwrap();
state.metrics.load_ext_ms = library_load_time;
state.metrics.num_ext_downloaded = remote_ext_metrics.num_ext_downloaded;
state.metrics.largest_ext_size = remote_ext_metrics.largest_ext_size;
state.metrics.total_ext_download_size = remote_ext_metrics.total_ext_download_size;
info!(
"Loading shared_preload_libraries took {:?}ms",
library_load_time
);
info!("{:?}", remote_ext_metrics);
Ok(())
}
/// Start the vm-monitor if directed to. The vm-monitor only runs on linux
/// because it requires cgroups.
fn start_vm_monitor(&self, disable_lfc_resizing: bool) -> StartVmMonitorResult {
@@ -1574,21 +1618,6 @@ impl ComputeNode {
}
self.post_apply_config()?;
let conf = self.get_conn_conf(None);
tokio::task::spawn_blocking(|| {
let res = get_installed_extensions(conf);
match res {
Ok(extensions) => {
info!(
"[NEON_EXT_STAT] {}",
serde_json::to_string(&extensions)
.expect("failed to serialize extensions list")
);
}
Err(err) => error!("could not get installed extensions: {err:?}"),
}
});
Ok(())
}
@@ -2030,3 +2059,26 @@ pub fn forward_termination_signal() {
kill(pg_pid, Signal::SIGINT).ok();
}
}
// helper trait to call JoinSet::spawn_blocking(f), but propagates the current
// tracing span to the thread.
trait JoinSetExt<T> {
fn spawn_blocking_child<F>(&mut self, f: F) -> tokio::task::AbortHandle
where
F: FnOnce() -> T + Send + 'static,
T: Send;
}
impl<T: 'static> JoinSetExt<T> for tokio::task::JoinSet<T> {
fn spawn_blocking_child<F>(&mut self, f: F) -> tokio::task::AbortHandle
where
F: FnOnce() -> T + Send + 'static,
T: Send,
{
let sp = tracing::Span::current();
self.spawn_blocking(move || {
let _e = sp.enter();
f()
})
}
}