diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 9e065e84a4..a89d3345c1 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -23,7 +23,7 @@ use postgres::NoTls; use postgres::error::SqlState; use remote_storage::{DownloadError, RemotePath}; use tokio::spawn; -use tracing::{debug, error, info, instrument, warn}; +use tracing::{Instrument, debug, error, info, instrument, warn}; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; use utils::measured_stream::MeasuredReader; @@ -522,9 +522,13 @@ impl ComputeNode { /// - status is left in ComputeStatus::Init. The caller is responsible for setting it to Failed /// - if Postgres was started before the fatal error happened, self.running_postgres is /// set. The caller is responsible for killing it. + /// + /// Note that this is in the critical path of a compute cold start. Keep this fast. + /// Try to do things concurrently, to hide the latencies. fn start_compute(self: &Arc, pg_handle: &mut Option) -> Result<()> { let compute_state: ComputeState; + let start_compute_span; let _this_entered; { let mut state_guard = self.state.lock().unwrap(); @@ -536,7 +540,7 @@ impl ComputeNode { // startup to be considered part of the /configure request. // // Similarly, if a trace ID was passed in env variables, attach it to the span. - _this_entered = { + start_compute_span = { // Temporarily enter the parent span, so that the new span becomes its child. if let Some(p) = state_guard.startup_span.take() { let _parent_entered = p.entered(); @@ -549,8 +553,8 @@ impl ComputeNode { } else { tracing::info_span!("start_compute") } - } - .entered(); + }; + _this_entered = start_compute_span.enter(); state_guard.set_status(ComputeStatus::Init, &self.state_changed); compute_state = state_guard.clone() @@ -567,23 +571,44 @@ impl ComputeNode { pspec.spec.remote_extensions, ); - // Launch remaining service threads - let _monitor_handle = launch_monitor(self); - let _configurator_handle = launch_configurator(self); + ////// PRE-STARTUP PHASE: things that need to be finished before we start the Postgres process + + // Collect all the tasks that must finish here + let mut pre_tasks = tokio::task::JoinSet::new(); + + // If there are any remote extensions in shared_preload_libraries, start downloading them + if pspec.spec.remote_extensions.is_some() { + let (this, spec) = (self.clone(), pspec.spec.clone()); + pre_tasks.spawn(async move { + this.download_preload_extensions(&spec) + .in_current_span() + .await + }); + } + + // Prepare pgdata directory. This downloads the basebackup, among other things. + { + let (this, cs) = (self.clone(), compute_state.clone()); + pre_tasks.spawn_blocking_child(move || this.prepare_pgdata(&cs)); + } // Resize swap to the desired size if the compute spec says so if let (Some(size_bytes), true) = (pspec.spec.swap_size_bytes, self.params.resize_swap_on_bind) { - // To avoid 'swapoff' hitting postgres startup, we need to run resize-swap to completion - // *before* starting postgres. - // - // In theory, we could do this asynchronously if SkipSwapon was enabled for VMs, but this - // carries a risk of introducing hard-to-debug issues - e.g. if postgres sometimes gets - // OOM-killed during startup because swap wasn't available yet. - resize_swap(size_bytes).context("failed to resize swap")?; - let size_mib = size_bytes as f32 / (1 << 20) as f32; // just for more coherent display. - info!(%size_bytes, %size_mib, "resized swap"); + pre_tasks.spawn_blocking_child(move || { + // To avoid 'swapoff' hitting postgres startup, we need to run resize-swap to completion + // *before* starting postgres. + // + // In theory, we could do this asynchronously if SkipSwapon was enabled for VMs, but this + // carries a risk of introducing hard-to-debug issues - e.g. if postgres sometimes gets + // OOM-killed during startup because swap wasn't available yet. + resize_swap(size_bytes).context("failed to resize swap")?; + let size_mib = size_bytes as f32 / (1 << 20) as f32; // just for more coherent display. + info!(%size_bytes, %size_mib, "resized swap"); + + Ok::<(), anyhow::Error>(()) + }); } // Set disk quota if the compute spec says so @@ -591,10 +616,15 @@ impl ComputeNode { pspec.spec.disk_quota_bytes, self.params.set_disk_quota_for_fs.as_ref(), ) { - set_disk_quota(disk_quota_bytes, disk_quota_fs_mountpoint) - .context("failed to set disk quota")?; - let size_mib = disk_quota_bytes as f32 / (1 << 20) as f32; // just for more coherent display. - info!(%disk_quota_bytes, %size_mib, "set disk quota"); + let disk_quota_fs_mountpoint = disk_quota_fs_mountpoint.clone(); + pre_tasks.spawn_blocking_child(move || { + set_disk_quota(disk_quota_bytes, &disk_quota_fs_mountpoint) + .context("failed to set disk quota")?; + let size_mib = disk_quota_bytes as f32 / (1 << 20) as f32; // just for more coherent display. + info!(%disk_quota_bytes, %size_mib, "set disk quota"); + + Ok::<(), anyhow::Error>(()) + }); } // tune pgbouncer @@ -628,37 +658,17 @@ impl ComputeNode { }); } - // This part is sync, because we need to download - // remote shared_preload_libraries before postgres start (if any) - if let Some(remote_extensions) = &pspec.spec.remote_extensions { - // First, create control files for all availale extensions - extension_server::create_control_files(remote_extensions, &self.params.pgbin); + // Launch remaining service threads + let _monitor_handle = launch_monitor(self); + let _configurator_handle = launch_configurator(self); - let library_load_start_time = Utc::now(); - let rt = tokio::runtime::Handle::current(); - let remote_ext_metrics = rt.block_on(self.prepare_preload_libraries(&pspec.spec))?; - - let library_load_time = Utc::now() - .signed_duration_since(library_load_start_time) - .to_std() - .unwrap() - .as_millis() as u64; - let mut state = self.state.lock().unwrap(); - state.metrics.load_ext_ms = library_load_time; - state.metrics.num_ext_downloaded = remote_ext_metrics.num_ext_downloaded; - state.metrics.largest_ext_size = remote_ext_metrics.largest_ext_size; - state.metrics.total_ext_download_size = remote_ext_metrics.total_ext_download_size; - info!( - "Loading shared_preload_libraries took {:?}ms", - library_load_time - ); - info!("{:?}", remote_ext_metrics); + // Wait for all the pre-tasks to finish before starting postgres + let rt = tokio::runtime::Handle::current(); + while let Some(res) = rt.block_on(pre_tasks.join_next()) { + res??; } - // Prepre pgdata directory. This downloads the basebackup, among other things. - self.prepare_pgdata(&compute_state)?; - - // Start Postgres + ////// START POSTGRES let start_time = Utc::now(); let pg_process = self.start_postgres(pspec.storage_auth_token.clone())?; let postmaster_pid = pg_process.pid(); @@ -669,6 +679,7 @@ impl ComputeNode { let config_time = Utc::now(); if pspec.spec.mode == ComputeMode::Primary { self.configure_as_primary(&compute_state)?; + let conf = self.get_conn_conf(None); tokio::task::spawn_blocking(|| { let res = get_installed_extensions(conf); @@ -714,6 +725,39 @@ impl ComputeNode { Ok(()) } + #[instrument(skip_all)] + async fn download_preload_extensions(&self, spec: &ComputeSpec) -> Result<()> { + let remote_extensions = if let Some(remote_extensions) = &spec.remote_extensions { + remote_extensions + } else { + return Ok(()); + }; + + // First, create control files for all available extensions + extension_server::create_control_files(remote_extensions, &self.params.pgbin); + + let library_load_start_time = Utc::now(); + let remote_ext_metrics = self.prepare_preload_libraries(spec).await?; + + let library_load_time = Utc::now() + .signed_duration_since(library_load_start_time) + .to_std() + .unwrap() + .as_millis() as u64; + let mut state = self.state.lock().unwrap(); + state.metrics.load_ext_ms = library_load_time; + state.metrics.num_ext_downloaded = remote_ext_metrics.num_ext_downloaded; + state.metrics.largest_ext_size = remote_ext_metrics.largest_ext_size; + state.metrics.total_ext_download_size = remote_ext_metrics.total_ext_download_size; + info!( + "Loading shared_preload_libraries took {:?}ms", + library_load_time + ); + info!("{:?}", remote_ext_metrics); + + Ok(()) + } + /// Start the vm-monitor if directed to. The vm-monitor only runs on linux /// because it requires cgroups. fn start_vm_monitor(&self, disable_lfc_resizing: bool) -> StartVmMonitorResult { @@ -1574,21 +1618,6 @@ impl ComputeNode { } self.post_apply_config()?; - let conf = self.get_conn_conf(None); - tokio::task::spawn_blocking(|| { - let res = get_installed_extensions(conf); - match res { - Ok(extensions) => { - info!( - "[NEON_EXT_STAT] {}", - serde_json::to_string(&extensions) - .expect("failed to serialize extensions list") - ); - } - Err(err) => error!("could not get installed extensions: {err:?}"), - } - }); - Ok(()) } @@ -2030,3 +2059,26 @@ pub fn forward_termination_signal() { kill(pg_pid, Signal::SIGINT).ok(); } } + +// helper trait to call JoinSet::spawn_blocking(f), but propagates the current +// tracing span to the thread. +trait JoinSetExt { + fn spawn_blocking_child(&mut self, f: F) -> tokio::task::AbortHandle + where + F: FnOnce() -> T + Send + 'static, + T: Send; +} + +impl JoinSetExt for tokio::task::JoinSet { + fn spawn_blocking_child(&mut self, f: F) -> tokio::task::AbortHandle + where + F: FnOnce() -> T + Send + 'static, + T: Send, + { + let sp = tracing::Span::current(); + self.spawn_blocking(move || { + let _e = sp.enter(); + f() + }) + } +}