From 8625466144f0fa3e08d52e675f66ed9ad0c37274 Mon Sep 17 00:00:00 2001 From: Shany Pozin Date: Tue, 28 Nov 2023 16:49:31 +0200 Subject: [PATCH] Move run_initdb to be async and guarded by max of 8 running tasks. Fixes #5895. Use tenant.cancel for cancellation (#5921) ## Problem https://github.com/neondatabase/neon/issues/5895 --- pageserver/src/tenant.rs | 128 ++++++++++++++++++++++++++------------- 1 file changed, 86 insertions(+), 42 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index a4d61f0951..0260905bd2 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -19,6 +19,7 @@ use futures::FutureExt; use pageserver_api::models::TimelineState; use remote_storage::DownloadError; use remote_storage::GenericRemoteStorage; +use std::fmt; use storage_broker::BrokerClientChannel; use tokio::runtime::Handle; use tokio::sync::watch; @@ -31,26 +32,6 @@ use utils::crashsafe::path_with_suffix_extension; use utils::fs_ext; use utils::sync::gate::Gate; -use std::cmp::min; -use std::collections::hash_map::Entry; -use std::collections::BTreeSet; -use std::collections::HashMap; -use std::collections::HashSet; -use std::fmt::Debug; -use std::fmt::Display; -use std::fs; -use std::fs::File; -use std::io; -use std::ops::Bound::Included; -use std::process::Command; -use std::process::Stdio; -use std::sync::atomic::AtomicU64; -use std::sync::atomic::Ordering; -use std::sync::Arc; -use std::sync::MutexGuard; -use std::sync::{Mutex, RwLock}; -use std::time::{Duration, Instant}; - use self::config::AttachedLocationConfig; use self::config::AttachmentMode; use self::config::LocationConf; @@ -84,14 +65,35 @@ use crate::tenant::remote_timeline_client::MaybeDeletedIndexPart; use crate::tenant::storage_layer::DeltaLayer; use crate::tenant::storage_layer::ImageLayer; use crate::InitializationOrder; +use std::cmp::min; +use std::collections::hash_map::Entry; +use std::collections::BTreeSet; +use std::collections::HashMap; +use std::collections::HashSet; +use std::fmt::Debug; +use std::fmt::Display; +use std::fs; +use std::fs::File; +use std::io; +use std::ops::Bound::Included; +use std::process::Stdio; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::sync::MutexGuard; +use std::sync::{Mutex, RwLock}; +use std::time::{Duration, Instant}; use crate::tenant::timeline::delete::DeleteTimelineFlow; use crate::tenant::timeline::uninit::cleanup_timeline_directory; use crate::virtual_file::VirtualFile; use crate::walredo::PostgresRedoManager; use crate::TEMP_FILE_SUFFIX; +use once_cell::sync::Lazy; pub use pageserver_api::models::TenantState; +use tokio::sync::Semaphore; +static INIT_DB_SEMAPHORE: Lazy = Lazy::new(|| Semaphore::new(8)); use toml_edit; use utils::{ crashsafe, @@ -403,6 +405,36 @@ pub enum CreateTimelineError { Other(#[from] anyhow::Error), } +#[derive(thiserror::Error, Debug)] +enum InitdbError { + Other(anyhow::Error), + Cancelled, + Spawn(std::io::Result<()>), + Failed(std::process::ExitStatus, Vec), +} + +impl fmt::Display for InitdbError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + InitdbError::Cancelled => write!(f, "Operation was cancelled"), + InitdbError::Spawn(e) => write!(f, "Spawn error: {:?}", e), + InitdbError::Failed(status, stderr) => write!( + f, + "Command failed with status {:?}: {}", + status, + String::from_utf8_lossy(stderr) + ), + InitdbError::Other(e) => write!(f, "Error: {:?}", e), + } + } +} + +impl From for InitdbError { + fn from(error: std::io::Error) -> Self { + InitdbError::Spawn(Err(error)) + } +} + struct TenantDirectoryScan { sorted_timelines_to_load: Vec<(TimelineId, TimelineMetadata)>, timelines_to_resume_deletion: Vec<(TimelineId, Option)>, @@ -2922,8 +2954,8 @@ impl Tenant { format!("Failed to remove already existing initdb directory: {pgdata_path}") })?; } - // Init temporarily repo to get bootstrap data, this creates a directory in the `pgdata_path` path - run_initdb(self.conf, &pgdata_path, pg_version)?; + // Init temporarily repo to get bootstrap data, this creates a directory in the `initdb_path` path + run_initdb(self.conf, &pgdata_path, pg_version, &self.cancel).await?; // this new directory is very temporary, set to remove it immediately after bootstrap, we don't need it scopeguard::defer! { if let Err(e) = fs::remove_dir_all(&pgdata_path) { @@ -3387,42 +3419,54 @@ fn rebase_directory( /// Create the cluster temporarily in 'initdbpath' directory inside the repository /// to get bootstrap data for timeline initialization. -fn run_initdb( +async fn run_initdb( conf: &'static PageServerConf, initdb_target_dir: &Utf8Path, pg_version: u32, -) -> anyhow::Result<()> { - let initdb_bin_path = conf.pg_bin_dir(pg_version)?.join("initdb"); - let initdb_lib_dir = conf.pg_lib_dir(pg_version)?; + cancel: &CancellationToken, +) -> Result<(), InitdbError> { + let initdb_bin_path = conf + .pg_bin_dir(pg_version) + .map_err(InitdbError::Other)? + .join("initdb"); + let initdb_lib_dir = conf.pg_lib_dir(pg_version).map_err(InitdbError::Other)?; info!( "running {} in {}, libdir: {}", initdb_bin_path, initdb_target_dir, initdb_lib_dir, ); - let initdb_output = Command::new(&initdb_bin_path) + let _permit = INIT_DB_SEMAPHORE.acquire().await; + + let initdb_command = tokio::process::Command::new(&initdb_bin_path) .args(["-D", initdb_target_dir.as_ref()]) .args(["-U", &conf.superuser]) .args(["-E", "utf8"]) .arg("--no-instructions") - // This is only used for a temporary installation that is deleted shortly after, - // so no need to fsync it .arg("--no-sync") .env_clear() .env("LD_LIBRARY_PATH", &initdb_lib_dir) .env("DYLD_LIBRARY_PATH", &initdb_lib_dir) - .stdout(Stdio::null()) - .output() - .with_context(|| { - format!( - "failed to execute {} at target dir {}", - initdb_bin_path, initdb_target_dir, - ) - })?; - if !initdb_output.status.success() { - bail!( - "initdb failed: '{}'", - String::from_utf8_lossy(&initdb_output.stderr) - ); + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + // If the `select!` below doesn't finish the `wait_with_output`, + // let the task get `wait()`ed for asynchronously by tokio. + // This means there is a slim chance we can go over the INIT_DB_SEMAPHORE. + // TODO: fix for this is non-trivial, see + // https://github.com/neondatabase/neon/pull/5921#pullrequestreview-1750858021 + // + .kill_on_drop(true) + .spawn()?; + + tokio::select! { + initdb_output = initdb_command.wait_with_output() => { + let initdb_output = initdb_output?; + if !initdb_output.status.success() { + return Err(InitdbError::Failed(initdb_output.status, initdb_output.stderr)); + } + } + _ = cancel.cancelled() => { + return Err(InitdbError::Cancelled); + } } Ok(())