diff --git a/Cargo.lock b/Cargo.lock index f503b45577..133ca5def9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1388,6 +1388,7 @@ dependencies = [ "tower-http", "tower-otel", "tracing", + "tracing-appender", "tracing-opentelemetry", "tracing-subscriber", "tracing-utils", @@ -7934,11 +7935,12 @@ dependencies = [ [[package]] name = "tracing-appender" -version = "0.2.2" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09d48f71a791638519505cefafe162606f706c25592e4bde4d97600c0195312e" +checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf" dependencies = [ "crossbeam-channel", + "thiserror 1.0.69", "time", "tracing-subscriber", ] diff --git a/Cargo.toml b/Cargo.toml index d8efabdadc..18236a81f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -222,6 +222,7 @@ tracing-log = "0.2" tracing-opentelemetry = "0.31" tracing-serde = "0.2.0" tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json"] } +tracing-appender = "0.2.3" try-lock = "0.2.5" test-log = { version = "0.2.17", default-features = false, features = ["log"] } twox-hash = { version = "1.6.3", default-features = false } diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 496471acc7..558760b0ad 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -62,6 +62,7 @@ tokio-stream.workspace = true tonic.workspace = true tower-otel.workspace = true tracing.workspace = true +tracing-appender.workspace = true tracing-opentelemetry.workspace = true tracing-subscriber.workspace = true tracing-utils.workspace = true diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 04723d6f3d..ee8a504429 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -51,6 +51,7 @@ use compute_tools::compute::{ use compute_tools::extension_server::get_pg_version_string; use compute_tools::logger::*; use compute_tools::params::*; +use compute_tools::pg_isready::get_pg_isready_bin; use compute_tools::spec::*; use rlimit::{Resource, setrlimit}; use signal_hook::consts::{SIGINT, SIGQUIT, SIGTERM}; @@ -194,7 +195,12 @@ fn main() -> Result<()> { .build()?; let _rt_guard = runtime.enter(); - let tracing_provider = init(cli.dev)?; + let mut log_dir = None; + if cli.lakebase_mode { + log_dir = std::env::var("COMPUTE_CTL_LOG_DIRECTORY").ok(); + } + + let (tracing_provider, _file_logs_guard) = init(cli.dev, log_dir)?; // enable core dumping for all child processes setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?; @@ -226,6 +232,8 @@ fn main() -> Result<()> { cli.installed_extensions_collection_interval, )), pg_init_timeout: cli.pg_init_timeout.map(Duration::from_secs), + pg_isready_bin: get_pg_isready_bin(&cli.pgbin), + instance_id: std::env::var("INSTANCE_ID").ok(), lakebase_mode: cli.lakebase_mode, }, config, @@ -238,8 +246,14 @@ fn main() -> Result<()> { deinit_and_exit(tracing_provider, exit_code); } -fn init(dev_mode: bool) -> Result> { - let provider = init_tracing_and_logging(DEFAULT_LOG_LEVEL)?; +fn init( + dev_mode: bool, + log_dir: Option, +) -> Result<( + Option, + Option, +)> { + let (provider, file_logs_guard) = init_tracing_and_logging(DEFAULT_LOG_LEVEL, &log_dir)?; let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?; thread::spawn(move || { @@ -250,7 +264,7 @@ fn init(dev_mode: bool) -> Result> { info!("compute build_tag: {}", &BUILD_TAG.to_string()); - Ok(provider) + Ok((provider, file_logs_guard)) } fn get_config(cli: &Cli) -> Result { diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index b4d7a6fca9..56bf7b8632 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -113,10 +113,12 @@ pub struct ComputeNodeParams { /// Interval for installed extensions collection pub installed_extensions_collection_interval: Arc, - + /// Hadron instance ID of the compute node. + pub instance_id: Option, /// Timeout of PG compute startup in the Init state. pub pg_init_timeout: Option, - + // Path to the `pg_isready` binary. + pub pg_isready_bin: String, pub lakebase_mode: bool, } @@ -486,6 +488,7 @@ impl ComputeNode { port: this.params.external_http_port, config: this.compute_ctl_config.clone(), compute_id: this.params.compute_id.clone(), + instance_id: this.params.instance_id.clone(), } .launch(&this); @@ -1785,6 +1788,34 @@ impl ComputeNode { Ok::<(), anyhow::Error>(()) } + // Signal to the configurator to refresh the configuration by pulling a new spec from the HCC. + // Note that this merely triggers a notification on a condition variable the configurator thread + // waits on. The configurator thread (in configurator.rs) pulls the new spec from the HCC and + // applies it. + pub async fn signal_refresh_configuration(&self) -> Result<()> { + let states_allowing_configuration_refresh = [ + ComputeStatus::Running, + ComputeStatus::Failed, + // ComputeStatus::RefreshConfigurationPending, + ]; + + let state = self.state.lock().expect("state lock poisoned"); + if states_allowing_configuration_refresh.contains(&state.status) { + // state.status = ComputeStatus::RefreshConfigurationPending; + self.state_changed.notify_all(); + Ok(()) + } else if state.status == ComputeStatus::Init { + // If the compute is in Init state, we can't refresh the configuration immediately, + // but we should be able to do that soon. + Ok(()) + } else { + Err(anyhow::anyhow!( + "Cannot refresh compute configuration in state {:?}", + state.status + )) + } + } + // Wrapped this around `pg_ctl reload`, but right now we don't use // `pg_ctl` for start / stop. #[instrument(skip_all)] diff --git a/compute_tools/src/http/middleware/authorize.rs b/compute_tools/src/http/middleware/authorize.rs index a82f46e062..407833bb0e 100644 --- a/compute_tools/src/http/middleware/authorize.rs +++ b/compute_tools/src/http/middleware/authorize.rs @@ -16,13 +16,29 @@ use crate::http::JsonResponse; #[derive(Clone, Debug)] pub(in crate::http) struct Authorize { compute_id: String, + // BEGIN HADRON + // Hadron instance ID. Only set if it's a Lakebase V1 a.k.a. Hadron instance. + instance_id: Option, + // END HADRON jwks: JwkSet, validation: Validation, } impl Authorize { - pub fn new(compute_id: String, jwks: JwkSet) -> Self { + pub fn new(compute_id: String, instance_id: Option, jwks: JwkSet) -> Self { let mut validation = Validation::new(Algorithm::EdDSA); + + // BEGIN HADRON + let use_rsa = jwks.keys.iter().any(|jwk| { + jwk.common + .key_algorithm + .is_some_and(|alg| alg == jsonwebtoken::jwk::KeyAlgorithm::RS256) + }); + if use_rsa { + validation = Validation::new(Algorithm::RS256); + } + // END HADRON + validation.validate_exp = true; // Unused by the control plane validation.validate_nbf = false; @@ -34,6 +50,7 @@ impl Authorize { Self { compute_id, + instance_id, jwks, validation, } @@ -47,10 +64,20 @@ impl AsyncAuthorizeRequest for Authorize { fn authorize(&mut self, mut request: Request) -> Self::Future { let compute_id = self.compute_id.clone(); + let is_hadron_instance = self.instance_id.is_some(); let jwks = self.jwks.clone(); let validation = self.validation.clone(); Box::pin(async move { + // BEGIN HADRON + // In Hadron deployments the "external" HTTP endpoint on compute_ctl can only be + // accessed by trusted components (enforced by dblet network policy), so we can bypass + // all auth here. + if is_hadron_instance { + return Ok(request); + } + // END HADRON + let TypedHeader(Authorization(bearer)) = request .extract_parts::>>() .await diff --git a/compute_tools/src/http/routes/hadron_liveness_probe.rs b/compute_tools/src/http/routes/hadron_liveness_probe.rs new file mode 100644 index 0000000000..4f66b6b139 --- /dev/null +++ b/compute_tools/src/http/routes/hadron_liveness_probe.rs @@ -0,0 +1,34 @@ +use crate::pg_isready::pg_isready; +use crate::{compute::ComputeNode, http::JsonResponse}; +use axum::{extract::State, http::StatusCode, response::Response}; +use std::sync::Arc; + +/// NOTE: NOT ENABLED YET +/// Detect if the compute is alive. +/// Called by the liveness probe of the compute container. +pub(in crate::http) async fn hadron_liveness_probe( + State(compute): State>, +) -> Response { + let port = match compute.params.connstr.port() { + Some(port) => port, + None => { + return JsonResponse::error( + StatusCode::INTERNAL_SERVER_ERROR, + "Failed to get the port from the connection string", + ); + } + }; + match pg_isready(&compute.params.pg_isready_bin, port) { + Ok(_) => { + // The connection is successful, so the compute is alive. + // Return a 200 OK response. + JsonResponse::success(StatusCode::OK, "ok") + } + Err(e) => { + tracing::error!("Hadron liveness probe failed: {}", e); + // The connection failed, so the compute is not alive. + // Return a 500 Internal Server Error response. + JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e) + } + } +} diff --git a/compute_tools/src/http/routes/mod.rs b/compute_tools/src/http/routes/mod.rs index dd71f663eb..c0f68701c6 100644 --- a/compute_tools/src/http/routes/mod.rs +++ b/compute_tools/src/http/routes/mod.rs @@ -10,11 +10,13 @@ pub(in crate::http) mod extension_server; pub(in crate::http) mod extensions; pub(in crate::http) mod failpoints; pub(in crate::http) mod grants; +pub(in crate::http) mod hadron_liveness_probe; pub(in crate::http) mod insights; pub(in crate::http) mod lfc; pub(in crate::http) mod metrics; pub(in crate::http) mod metrics_json; pub(in crate::http) mod promote; +pub(in crate::http) mod refresh_configuration; pub(in crate::http) mod status; pub(in crate::http) mod terminate; diff --git a/compute_tools/src/http/routes/refresh_configuration.rs b/compute_tools/src/http/routes/refresh_configuration.rs new file mode 100644 index 0000000000..d00f5a285a --- /dev/null +++ b/compute_tools/src/http/routes/refresh_configuration.rs @@ -0,0 +1,34 @@ +// This file is added by Hadron + +use std::sync::Arc; + +use axum::{ + extract::State, + response::{IntoResponse, Response}, +}; +use http::StatusCode; +use tracing::debug; + +use crate::compute::ComputeNode; +// use crate::hadron_metrics::POSTGRES_PAGESTREAM_REQUEST_ERRORS; +use crate::http::JsonResponse; + +// The /refresh_configuration POST method is used to nudge compute_ctl to pull a new spec +// from the HCC and attempt to reconfigure Postgres with the new spec. The method does not wait +// for the reconfiguration to complete. Rather, it simply delivers a signal that will cause +// configuration to be reloaded in a best effort manner. Invocation of this method does not +// guarantee that a reconfiguration will occur. The caller should consider keep sending this +// request while it believes that the compute configuration is out of date. +pub(in crate::http) async fn refresh_configuration( + State(compute): State>, +) -> Response { + debug!("serving /refresh_configuration POST request"); + // POSTGRES_PAGESTREAM_REQUEST_ERRORS.inc(); + match compute.signal_refresh_configuration().await { + Ok(_) => StatusCode::OK.into_response(), + Err(e) => { + tracing::error!("error handling /refresh_configuration request: {}", e); + JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e) + } + } +} diff --git a/compute_tools/src/http/server.rs b/compute_tools/src/http/server.rs index f0fbca8263..9901a6de07 100644 --- a/compute_tools/src/http/server.rs +++ b/compute_tools/src/http/server.rs @@ -27,6 +27,7 @@ use super::{ }, }; use crate::compute::ComputeNode; +use crate::http::routes::{hadron_liveness_probe, refresh_configuration}; /// `compute_ctl` has two servers: internal and external. The internal server /// binds to the loopback interface and handles communication from clients on @@ -43,6 +44,7 @@ pub enum Server { port: u16, config: ComputeCtlConfig, compute_id: String, + instance_id: Option, }, } @@ -67,7 +69,12 @@ impl From<&Server> for Router> { post(extension_server::download_extension), ) .route("/extensions", post(extensions::install_extension)) - .route("/grants", post(grants::add_grant)); + .route("/grants", post(grants::add_grant)) + // Hadron: Compute-initiated configuration refresh + .route( + "/refresh_configuration", + post(refresh_configuration::refresh_configuration), + ); // Add in any testing support if cfg!(feature = "testing") { @@ -79,7 +86,10 @@ impl From<&Server> for Router> { router } Server::External { - config, compute_id, .. + config, + compute_id, + instance_id, + .. } => { let unauthenticated_router = Router::>::new() .route("/metrics", get(metrics::get_metrics)) @@ -100,8 +110,13 @@ impl From<&Server> for Router> { .route("/metrics.json", get(metrics_json::get_metrics)) .route("/status", get(status::get_status)) .route("/terminate", post(terminate::terminate)) + .route( + "/hadron_liveness_probe", + get(hadron_liveness_probe::hadron_liveness_probe), + ) .layer(AsyncRequireAuthorizationLayer::new(Authorize::new( compute_id.clone(), + instance_id.clone(), config.jwks.clone(), ))); diff --git a/compute_tools/src/installed_extensions.rs b/compute_tools/src/installed_extensions.rs index 90e1a17be4..5f60b711c8 100644 --- a/compute_tools/src/installed_extensions.rs +++ b/compute_tools/src/installed_extensions.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use anyhow::Result; use compute_api::responses::{InstalledExtension, InstalledExtensions}; +use once_cell::sync::Lazy; use tokio_postgres::error::Error as PostgresError; use tokio_postgres::{Client, Config, NoTls}; @@ -119,3 +120,7 @@ pub async fn get_installed_extensions( extensions: extensions_map.into_values().collect(), }) } + +pub fn initialize_metrics() { + Lazy::force(&INSTALLED_EXTENSIONS); +} diff --git a/compute_tools/src/lib.rs b/compute_tools/src/lib.rs index 5ffa2f004a..85a6f955d9 100644 --- a/compute_tools/src/lib.rs +++ b/compute_tools/src/lib.rs @@ -25,6 +25,7 @@ mod migration; pub mod monitor; pub mod params; pub mod pg_helpers; +pub mod pg_isready; pub mod pgbouncer; pub mod rsyslog; pub mod spec; diff --git a/compute_tools/src/logger.rs b/compute_tools/src/logger.rs index cd076472a6..83e666223c 100644 --- a/compute_tools/src/logger.rs +++ b/compute_tools/src/logger.rs @@ -1,7 +1,10 @@ use std::collections::HashMap; +use std::sync::{LazyLock, RwLock}; +use tracing::Subscriber; use tracing::info; -use tracing_subscriber::layer::SubscriberExt; +use tracing_appender; use tracing_subscriber::prelude::*; +use tracing_subscriber::{fmt, layer::SubscriberExt, registry::LookupSpan}; /// Initialize logging to stderr, and OpenTelemetry tracing and exporter. /// @@ -15,16 +18,44 @@ use tracing_subscriber::prelude::*; /// pub fn init_tracing_and_logging( default_log_level: &str, -) -> anyhow::Result> { + log_dir_opt: &Option, +) -> anyhow::Result<( + Option, + Option, +)> { // Initialize Logging let env_filter = tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_log_level)); + // Standard output streams let fmt_layer = tracing_subscriber::fmt::layer() .with_ansi(false) .with_target(false) .with_writer(std::io::stderr); + // Logs with file rotation. Files in `$log_dir/pgcctl.yyyy-MM-dd` + let (json_to_file_layer, _file_logs_guard) = if let Some(log_dir) = log_dir_opt { + std::fs::create_dir_all(log_dir)?; + let file_logs_appender = tracing_appender::rolling::RollingFileAppender::builder() + .rotation(tracing_appender::rolling::Rotation::DAILY) + .filename_prefix("pgcctl") + // Lib appends to existing files, so we will keep files for up to 2 days even on restart loops. + // At minimum, log-daemon will have 1 day to detect and upload a file (if created right before midnight). + .max_log_files(2) + .build(log_dir) + .expect("Initializing rolling file appender should succeed"); + let (file_logs_writer, _file_logs_guard) = + tracing_appender::non_blocking(file_logs_appender); + let json_to_file_layer = tracing_subscriber::fmt::layer() + .with_ansi(false) + .with_target(false) + .event_format(PgJsonLogShapeFormatter) + .with_writer(file_logs_writer); + (Some(json_to_file_layer), Some(_file_logs_guard)) + } else { + (None, None) + }; + // Initialize OpenTelemetry let provider = tracing_utils::init_tracing("compute_ctl", tracing_utils::ExportConfig::default()); @@ -35,12 +66,13 @@ pub fn init_tracing_and_logging( .with(env_filter) .with(otlp_layer) .with(fmt_layer) + .with(json_to_file_layer) .init(); tracing::info!("logging and tracing started"); utils::logging::replace_panic_hook_with_tracing_panic_hook().forget(); - Ok(provider) + Ok((provider, _file_logs_guard)) } /// Replace all newline characters with a special character to make it @@ -95,3 +127,157 @@ pub fn startup_context_from_env() -> Option { None } } + +/// Track relevant id's +const UNKNOWN_IDS: &str = r#""pg_instance_id": "", "pg_compute_id": """#; +static IDS: LazyLock> = LazyLock::new(|| RwLock::new(UNKNOWN_IDS.to_string())); + +pub fn update_ids(instance_id: &Option, compute_id: &Option) -> anyhow::Result<()> { + let ids = format!( + r#""pg_instance_id": "{}", "pg_compute_id": "{}""#, + instance_id.as_ref().map(|s| s.as_str()).unwrap_or_default(), + compute_id.as_ref().map(|s| s.as_str()).unwrap_or_default() + ); + let mut guard = IDS + .write() + .map_err(|e| anyhow::anyhow!("Log set id's rwlock poisoned: {}", e))?; + *guard = ids; + Ok(()) +} + +/// Massage compute_ctl logs into PG json log shape so we can use the same Lumberjack setup. +struct PgJsonLogShapeFormatter; +impl fmt::format::FormatEvent for PgJsonLogShapeFormatter +where + S: Subscriber + for<'a> LookupSpan<'a>, + N: for<'a> fmt::format::FormatFields<'a> + 'static, +{ + fn format_event( + &self, + ctx: &fmt::FmtContext<'_, S, N>, + mut writer: fmt::format::Writer<'_>, + event: &tracing::Event<'_>, + ) -> std::fmt::Result { + // Format values from the event's metadata, and open message string + let metadata = event.metadata(); + { + let ids_guard = IDS.read(); + let ids = ids_guard + .as_ref() + .map(|guard| guard.as_str()) + // Surpress so that we don't lose all uploaded/ file logs if something goes super wrong. We would notice the missing id's. + .unwrap_or(UNKNOWN_IDS); + write!( + &mut writer, + r#"{{"timestamp": "{}", "error_severity": "{}", "file_name": "{}", "backend_type": "compute_ctl_self", {}, "message": "#, + chrono::Utc::now().format("%Y-%m-%d %H:%M:%S%.3f GMT"), + metadata.level(), + metadata.target(), + ids + )?; + } + + let mut message = String::new(); + let message_writer = fmt::format::Writer::new(&mut message); + + // Gather the message + ctx.field_format().format_fields(message_writer, event)?; + + // TODO: any better options than to copy-paste this OSS span formatter? + // impl FormatEvent for Format + // https://docs.rs/tracing-subscriber/latest/tracing_subscriber/fmt/trait.FormatEvent.html#impl-FormatEvent%3CS,+N%3E-for-Format%3CFull,+T%3E + + // write message, close bracket, and new line + writeln!(writer, "{}}}", serde_json::to_string(&message).unwrap()) + } +} + +#[cfg(feature = "testing")] +#[cfg(test)] +mod test { + use super::*; + use std::{cell::RefCell, io}; + + // Use thread_local! instead of Mutex for test isolation + thread_local! { + static WRITER_OUTPUT: RefCell = const { RefCell::new(String::new()) }; + } + + #[derive(Clone, Default)] + struct StaticStringWriter; + + impl io::Write for StaticStringWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + let output = String::from_utf8(buf.to_vec()).expect("Invalid UTF-8 in test output"); + WRITER_OUTPUT.with(|s| s.borrow_mut().push_str(&output)); + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } + } + + impl fmt::MakeWriter<'_> for StaticStringWriter { + type Writer = Self; + + fn make_writer(&self) -> Self::Writer { + Self + } + } + + #[test] + fn test_log_pg_json_shape_formatter() { + // Use a scoped subscriber to prevent global state pollution + let subscriber = tracing_subscriber::registry().with( + tracing_subscriber::fmt::layer() + .with_ansi(false) + .with_target(false) + .event_format(PgJsonLogShapeFormatter) + .with_writer(StaticStringWriter), + ); + + let _ = update_ids(&Some("000".to_string()), &Some("111".to_string())); + + // Clear any previous test state + WRITER_OUTPUT.with(|s| s.borrow_mut().clear()); + + let messages = [ + "test message", + r#"json escape check: name="BatchSpanProcessor.Flush.ExportError" reason="Other(reqwest::Error { kind: Request, url: \"http://localhost:4318/v1/traces\", source: hyper_ + util::client::legacy::Error(Connect, ConnectError(\"tcp connect error\", Os { code: 111, kind: ConnectionRefused, message: \"Connection refused\" })) })" Failed during the export process"#, + ]; + + tracing::subscriber::with_default(subscriber, || { + for message in messages { + tracing::info!(message); + } + }); + tracing::info!("not test message"); + + // Get captured output + let output = WRITER_OUTPUT.with(|s| s.borrow().clone()); + + let json_strings: Vec<&str> = output.lines().collect(); + assert_eq!( + json_strings.len(), + messages.len(), + "Log didn't have the expected number of json strings." + ); + + let json_string_shape_regex = regex::Regex::new( + r#"\{"timestamp": "\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3} GMT", "error_severity": "INFO", "file_name": ".+", "backend_type": "compute_ctl_self", "pg_instance_id": "000", "pg_compute_id": "111", "message": ".+"\}"# + ).unwrap(); + + for (i, expected_message) in messages.iter().enumerate() { + let json_string = json_strings[i]; + assert!( + json_string_shape_regex.is_match(json_string), + "Json log didn't match expected pattern:\n{json_string}", + ); + let parsed_json: serde_json::Value = serde_json::from_str(json_string).unwrap(); + let actual_message = parsed_json["message"].as_str().unwrap(); + assert_eq!(*expected_message, actual_message); + } + } +} diff --git a/compute_tools/src/pg_isready.rs b/compute_tools/src/pg_isready.rs new file mode 100644 index 0000000000..76c45d6b0a --- /dev/null +++ b/compute_tools/src/pg_isready.rs @@ -0,0 +1,30 @@ +use anyhow::{Context, anyhow}; + +// Run `/usr/local/bin/pg_isready -p {port}` +// Check the connectivity of PG +// Success means PG is listening on the port and accepting connections +// Note that PG does not need to authenticate the connection, nor reserve a connection quota for it. +// See https://www.postgresql.org/docs/current/app-pg-isready.html +pub fn pg_isready(bin: &str, port: u16) -> anyhow::Result<()> { + let child_result = std::process::Command::new(bin) + .arg("-p") + .arg(port.to_string()) + .spawn(); + + child_result + .context("spawn() failed") + .and_then(|mut child| child.wait().context("wait() failed")) + .and_then(|status| match status.success() { + true => Ok(()), + false => Err(anyhow!("process exited with {status}")), + }) + // wrap any prior error with the overall context that we couldn't run the command + .with_context(|| format!("could not run `{bin} --port {port}`")) +} + +// It's safe to assume pg_isready is under the same directory with postgres, +// because it is a PG util bin installed along with postgres +pub fn get_pg_isready_bin(pgbin: &str) -> String { + let split = pgbin.split("/").collect::>(); + split[0..split.len() - 1].join("/") + "/pg_isready" +}