Move run_initdb to be async and guarded by max of 8 running tasks. Fixes #5895. Use tenant.cancel for cancellation (#5921)

## Problem
https://github.com/neondatabase/neon/issues/5895
This commit is contained in:
Shany Pozin
2023-11-28 16:49:31 +02:00
committed by GitHub
parent 1ab0cfc8cb
commit 8625466144

View File

@@ -19,6 +19,7 @@ use futures::FutureExt;
use pageserver_api::models::TimelineState;
use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
use std::fmt;
use storage_broker::BrokerClientChannel;
use tokio::runtime::Handle;
use tokio::sync::watch;
@@ -31,26 +32,6 @@ use utils::crashsafe::path_with_suffix_extension;
use utils::fs_ext;
use utils::sync::gate::Gate;
use std::cmp::min;
use std::collections::hash_map::Entry;
use std::collections::BTreeSet;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::Debug;
use std::fmt::Display;
use std::fs;
use std::fs::File;
use std::io;
use std::ops::Bound::Included;
use std::process::Command;
use std::process::Stdio;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::MutexGuard;
use std::sync::{Mutex, RwLock};
use std::time::{Duration, Instant};
use self::config::AttachedLocationConfig;
use self::config::AttachmentMode;
use self::config::LocationConf;
@@ -84,14 +65,35 @@ use crate::tenant::remote_timeline_client::MaybeDeletedIndexPart;
use crate::tenant::storage_layer::DeltaLayer;
use crate::tenant::storage_layer::ImageLayer;
use crate::InitializationOrder;
use std::cmp::min;
use std::collections::hash_map::Entry;
use std::collections::BTreeSet;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::Debug;
use std::fmt::Display;
use std::fs;
use std::fs::File;
use std::io;
use std::ops::Bound::Included;
use std::process::Stdio;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::MutexGuard;
use std::sync::{Mutex, RwLock};
use std::time::{Duration, Instant};
use crate::tenant::timeline::delete::DeleteTimelineFlow;
use crate::tenant::timeline::uninit::cleanup_timeline_directory;
use crate::virtual_file::VirtualFile;
use crate::walredo::PostgresRedoManager;
use crate::TEMP_FILE_SUFFIX;
use once_cell::sync::Lazy;
pub use pageserver_api::models::TenantState;
use tokio::sync::Semaphore;
static INIT_DB_SEMAPHORE: Lazy<Semaphore> = Lazy::new(|| Semaphore::new(8));
use toml_edit;
use utils::{
crashsafe,
@@ -403,6 +405,36 @@ pub enum CreateTimelineError {
Other(#[from] anyhow::Error),
}
#[derive(thiserror::Error, Debug)]
enum InitdbError {
Other(anyhow::Error),
Cancelled,
Spawn(std::io::Result<()>),
Failed(std::process::ExitStatus, Vec<u8>),
}
impl fmt::Display for InitdbError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
InitdbError::Cancelled => write!(f, "Operation was cancelled"),
InitdbError::Spawn(e) => write!(f, "Spawn error: {:?}", e),
InitdbError::Failed(status, stderr) => write!(
f,
"Command failed with status {:?}: {}",
status,
String::from_utf8_lossy(stderr)
),
InitdbError::Other(e) => write!(f, "Error: {:?}", e),
}
}
}
impl From<std::io::Error> for InitdbError {
fn from(error: std::io::Error) -> Self {
InitdbError::Spawn(Err(error))
}
}
struct TenantDirectoryScan {
sorted_timelines_to_load: Vec<(TimelineId, TimelineMetadata)>,
timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)>,
@@ -2922,8 +2954,8 @@ impl Tenant {
format!("Failed to remove already existing initdb directory: {pgdata_path}")
})?;
}
// Init temporarily repo to get bootstrap data, this creates a directory in the `pgdata_path` path
run_initdb(self.conf, &pgdata_path, pg_version)?;
// Init temporarily repo to get bootstrap data, this creates a directory in the `initdb_path` path
run_initdb(self.conf, &pgdata_path, pg_version, &self.cancel).await?;
// this new directory is very temporary, set to remove it immediately after bootstrap, we don't need it
scopeguard::defer! {
if let Err(e) = fs::remove_dir_all(&pgdata_path) {
@@ -3387,42 +3419,54 @@ fn rebase_directory(
/// Create the cluster temporarily in 'initdbpath' directory inside the repository
/// to get bootstrap data for timeline initialization.
fn run_initdb(
async fn run_initdb(
conf: &'static PageServerConf,
initdb_target_dir: &Utf8Path,
pg_version: u32,
) -> anyhow::Result<()> {
let initdb_bin_path = conf.pg_bin_dir(pg_version)?.join("initdb");
let initdb_lib_dir = conf.pg_lib_dir(pg_version)?;
cancel: &CancellationToken,
) -> Result<(), InitdbError> {
let initdb_bin_path = conf
.pg_bin_dir(pg_version)
.map_err(InitdbError::Other)?
.join("initdb");
let initdb_lib_dir = conf.pg_lib_dir(pg_version).map_err(InitdbError::Other)?;
info!(
"running {} in {}, libdir: {}",
initdb_bin_path, initdb_target_dir, initdb_lib_dir,
);
let initdb_output = Command::new(&initdb_bin_path)
let _permit = INIT_DB_SEMAPHORE.acquire().await;
let initdb_command = tokio::process::Command::new(&initdb_bin_path)
.args(["-D", initdb_target_dir.as_ref()])
.args(["-U", &conf.superuser])
.args(["-E", "utf8"])
.arg("--no-instructions")
// This is only used for a temporary installation that is deleted shortly after,
// so no need to fsync it
.arg("--no-sync")
.env_clear()
.env("LD_LIBRARY_PATH", &initdb_lib_dir)
.env("DYLD_LIBRARY_PATH", &initdb_lib_dir)
.stdout(Stdio::null())
.output()
.with_context(|| {
format!(
"failed to execute {} at target dir {}",
initdb_bin_path, initdb_target_dir,
)
})?;
if !initdb_output.status.success() {
bail!(
"initdb failed: '{}'",
String::from_utf8_lossy(&initdb_output.stderr)
);
.stdout(Stdio::piped())
.stderr(Stdio::piped())
// If the `select!` below doesn't finish the `wait_with_output`,
// let the task get `wait()`ed for asynchronously by tokio.
// This means there is a slim chance we can go over the INIT_DB_SEMAPHORE.
// TODO: fix for this is non-trivial, see
// https://github.com/neondatabase/neon/pull/5921#pullrequestreview-1750858021
//
.kill_on_drop(true)
.spawn()?;
tokio::select! {
initdb_output = initdb_command.wait_with_output() => {
let initdb_output = initdb_output?;
if !initdb_output.status.success() {
return Err(InitdbError::Failed(initdb_output.status, initdb_output.stderr));
}
}
_ = cancel.cancelled() => {
return Err(InitdbError::Cancelled);
}
}
Ok(())