diff --git a/Cargo.lock b/Cargo.lock index ddb10352b8..69a8fa19ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -497,8 +497,10 @@ dependencies = [ "chrono", "clap 3.2.16", "env_logger", + "futures", "hyper", "log", + "notify", "postgres", "regex", "serde", @@ -1072,6 +1074,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "futures" version = "0.3.21" @@ -1493,6 +1504,26 @@ dependencies = [ "str_stack", ] +[[package]] +name = "inotify" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" +dependencies = [ + "bitflags", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "instant" version = "0.1.12" @@ -1552,6 +1583,26 @@ dependencies = [ "simple_asn1", ] +[[package]] +name = "kqueue" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d6112e8f37b59803ac47a42d14f1f3a59bbf72fc6857ffc5be455e28a691f8e" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8367585489f01bc55dd27404dcf56b95e6da061a256a666ab23be9ba96a2e587" +dependencies = [ + "bitflags", + "libc", +] + [[package]] name = "kstring" version = "1.0.6" @@ -1797,6 +1848,24 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "notify" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed2c66da08abae1c024c01d635253e402341b4060a12e99b31c7594063bf490a" +dependencies = [ + "bitflags", + "crossbeam-channel", + "filetime", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "mio", + "walkdir", + "winapi", +] + [[package]] name = "num-bigint" version = "0.4.3" @@ -4142,6 +4211,7 @@ dependencies = [ "bstr", "bytes", "chrono", + "crossbeam-utils", "either", "fail", "hashbrown", diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index b13f7f191d..43cf7ae2dd 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -8,8 +8,10 @@ anyhow = "1.0" chrono = "0.4" clap = "3.0" env_logger = "0.9" +futures = "0.3.13" hyper = { version = "0.14", features = ["full"] } log = { version = "0.4", features = ["std", "serde"] } +notify = "5.0.0" postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } regex = "1" serde = { version = "1.0", features = ["derive"] } diff --git a/compute_tools/src/pg_helpers.rs b/compute_tools/src/pg_helpers.rs index 8802dae639..769dbfac73 100644 --- a/compute_tools/src/pg_helpers.rs +++ b/compute_tools/src/pg_helpers.rs @@ -1,16 +1,19 @@ use std::fmt::Write; +use std::fs; use std::fs::File; use std::io::{BufRead, BufReader}; use std::os::unix::fs::PermissionsExt; use std::path::Path; use std::process::Child; -use std::{fs, thread, time}; +use std::time::{Duration, Instant}; use anyhow::{bail, Result}; use postgres::{Client, Transaction}; use serde::Deserialize; -const POSTGRES_WAIT_TIMEOUT: u64 = 60 * 1000; // milliseconds +use notify::{RecursiveMode, Watcher}; + +const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds /// Rust representation of Postgres role info with only those fields /// that matter for us. @@ -233,29 +236,63 @@ pub fn get_existing_dbs(client: &mut Client) -> Result> { /// 'ready'. pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> { let pid_path = pgdata.join("postmaster.pid"); - let mut slept: u64 = 0; // ms - let pause = time::Duration::from_millis(100); + // PostgreSQL writes line "ready" to the postmaster.pid file, when it has + // completed initialization and is ready to accept connections. We want to + // react quickly and perform the rest of our initialization as soon as + // PostgreSQL starts accepting connections. Use 'notify' to be notified + // whenever the PID file is changed, and whenever it changes, read it to + // check if it's now "ready". + // + // You cannot actually watch a file before it exists, so we first watch the + // data directory, and once the postmaster.pid file appears, we switch to + // watch the file instead. We also wake up every 100 ms to poll, just in + // case we miss some events for some reason. Not strictly necessary, but + // better safe than sorry. + let (tx, rx) = std::sync::mpsc::channel(); + let mut watcher = notify::recommended_watcher(move |res| { + let _ = tx.send(res); + })?; + watcher.watch(pgdata, RecursiveMode::NonRecursive)?; + + let started_at = Instant::now(); + let mut postmaster_pid_seen = false; loop { - // Sleep POSTGRES_WAIT_TIMEOUT at max - if slept >= POSTGRES_WAIT_TIMEOUT { - bail!("timed out while waiting for Postgres to start"); - } - if let Ok(Some(status)) = pg.try_wait() { // Postgres exited, that is not what we expected, bail out earlier. let code = status.code().unwrap_or(-1); bail!("Postgres exited unexpectedly with code {}", code); } + let res = rx.recv_timeout(Duration::from_millis(100)); + log::debug!("woken up by notify: {res:?}"); + // If there are multiple events in the channel already, we only need to be + // check once. Swallow the extra events before we go ahead to check the + // pid file. + while let Ok(res) = rx.try_recv() { + log::debug!("swallowing extra event: {res:?}"); + } + // Check that we can open pid file first. if let Ok(file) = File::open(&pid_path) { + if !postmaster_pid_seen { + log::debug!("postmaster.pid appeared"); + watcher + .unwatch(pgdata) + .expect("Failed to remove pgdata dir watch"); + watcher + .watch(&pid_path, RecursiveMode::NonRecursive) + .expect("Failed to add postmaster.pid file watch"); + postmaster_pid_seen = true; + } + let file = BufReader::new(file); let last_line = file.lines().last(); // Pid file could be there and we could read it, but it could be empty, for example. if let Some(Ok(line)) = last_line { let status = line.trim(); + log::debug!("last line of postmaster.pid: {status:?}"); // Now Postgres is ready to accept connections if status == "ready" { @@ -264,8 +301,11 @@ pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> { } } - thread::sleep(pause); - slept += 100; + // Give up after POSTGRES_WAIT_TIMEOUT. + let duration = started_at.elapsed(); + if duration >= POSTGRES_WAIT_TIMEOUT { + bail!("timed out while waiting for Postgres to start"); + } } log::info!("PostgreSQL is now running, continuing to configure it"); diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index f37a42945e..6977665c7d 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -19,6 +19,7 @@ anyhow = { version = "1", features = ["backtrace", "std"] } bstr = { version = "0.2", features = ["lazy_static", "regex-automata", "serde", "serde1", "serde1-nostd", "std", "unicode"] } bytes = { version = "1", features = ["serde", "std"] } chrono = { version = "0.4", features = ["clock", "libc", "oldtime", "serde", "std", "time", "winapi"] } +crossbeam-utils = { version = "0.8", features = ["once_cell", "std"] } either = { version = "1", features = ["use_std"] } fail = { version = "0.5", default-features = false, features = ["failpoints"] } hashbrown = { version = "0.12", features = ["ahash", "inline-more", "raw"] }