diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 28e5f997bb..7056ef4f90 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -26,13 +26,12 @@ use serde::Serialize; use std::collections::VecDeque; use std::io; use std::io::prelude::*; -use std::io::{Error, ErrorKind}; use std::ops::{Deref, DerefMut}; use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::prelude::CommandExt; use std::process::Stdio; use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command}; -use std::sync::{Mutex, MutexGuard}; +use std::sync::{Arc, Mutex, MutexGuard, RwLock}; use std::time::Duration; use std::time::Instant; use tracing::*; @@ -93,7 +92,6 @@ pub trait WalRedoManager: Send + Sync { } struct ProcessInput { - child: NoLeakChild, stdin: ChildStdin, stderr_fd: RawFd, stdout_fd: RawFd, @@ -116,13 +114,7 @@ struct ProcessOutput { pub struct PostgresRedoManager { tenant_id: TenantId, conf: &'static PageServerConf, - /// Counter to separate same sized walredo inputs failing at the same millisecond. - #[cfg(feature = "testing")] - dump_sequence: AtomicUsize, - - stdout: Mutex>, - stdin: Mutex>, - stderr: Mutex>, + redo_process: RwLock>>, } /// Can this request be served by neon redo functions @@ -215,11 +207,7 @@ impl PostgresRedoManager { PostgresRedoManager { tenant_id, conf, - #[cfg(feature = "testing")] - dump_sequence: AtomicUsize::default(), - stdin: Mutex::new(None), - stdout: Mutex::new(None), - stderr: Mutex::new(None), + redo_process: RwLock::new(None), } } @@ -242,20 +230,38 @@ impl PostgresRedoManager { let start_time = Instant::now(); let mut n_attempts = 0u32; loop { - let mut proc = self.stdin.lock().unwrap(); let lock_time = Instant::now(); // launch the WAL redo process on first use - if proc.is_none() { - self.launch(&mut proc, pg_version) - .context("launch process")?; - } + let proc: Arc = { + let proc_guard = self.redo_process.read().unwrap(); + match &*proc_guard { + None => { + // "upgrade" to write lock to launch the process + drop(proc_guard); + let mut proc_guard = self.redo_process.write().unwrap(); + match &*proc_guard { + None => { + let proc = Arc::new( + WalRedoProcess::launch(self.conf, self.tenant_id, pg_version) + .context("launch walredo process")?, + ); + *proc_guard = Some(Arc::clone(&proc)); + proc + } + Some(proc) => Arc::clone(proc), + } + } + Some(proc) => Arc::clone(proc), + } + }; + WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64()); // Relational WAL records are applied using wal-redo-postgres let buf_tag = BufferTag { rel, blknum }; - let result = self - .apply_wal_records(proc, buf_tag, &base_img, records, wal_redo_timeout) + let result = proc + .apply_wal_records(buf_tag, &base_img, records, wal_redo_timeout) .context("apply_wal_records"); let end_time = Instant::now(); @@ -296,22 +302,34 @@ impl PostgresRedoManager { n_attempts, e, ); - // self.stdin only holds stdin & stderr as_raw_fd(). - // Dropping it as part of take() doesn't close them. - // The owning objects (ChildStdout and ChildStderr) are stored in - // self.stdout and self.stderr, respsectively. - // We intentionally keep them open here to avoid a race between - // currently running `apply_wal_records()` and a `launch()` call - // after we return here. - // The currently running `apply_wal_records()` must not read from - // the newly launched process. - // By keeping self.stdout and self.stderr open here, `launch()` will - // get other file descriptors for the new child's stdout and stderr, - // and hence the current `apply_wal_records()` calls will observe - // `output.stdout.as_raw_fd() != stdout_fd` . - if let Some(proc) = self.stdin.lock().unwrap().take() { - proc.child.kill_and_wait(); + // Avoid concurrent callers hitting the same issue. + // We can't prevent it from happening because we want to enable parallelism. + let mut guard = self.redo_process.write().unwrap(); + match &*guard { + Some(current_field_value) => { + if Arc::ptr_eq(current_field_value, &proc) { + // We're the first to observe an error from `proc`, it's our job to take it out of rotation. + *guard = None; + } + } + None => { + // Another thread was faster to observe the error, and already took the process out of rotation. + } } + drop(guard); + // NB: there may still be other concurrent threads using `proc`. + // The last one will send SIGKILL when the underlying Arc reaches refcount 0. + // NB: it's important to drop(proc) after drop(guard). Otherwise we'd keep + // holding the lock while waiting for the process to exit. + // NB: the drop impl blocks the current threads with a wait() system call for + // the child process. We dropped the `guard` above so that other threads aren't + // affected. But, it's good that the current thread _does_ block to wait. + // If we instead deferred the waiting into the background / to tokio, it could + // happen that if walredo always fails immediately, we spawn processes faster + // than we can SIGKILL & `wait` for them to exit. By doing it the way we do here, + // we limit this risk of run-away to at most $num_runtimes * $num_executor_threads. + // This probably needs revisiting at some later point. + drop(proc); } else if n_attempts != 0 { info!(n_attempts, "retried walredo succeeded"); } @@ -614,24 +632,32 @@ impl CloseFileDescriptors for C { } } -impl PostgresRedoManager { +struct WalRedoProcess { + #[allow(dead_code)] + conf: &'static PageServerConf, + tenant_id: TenantId, + // Some() on construction, only becomes None on Drop. + child: Option, + stdout: Mutex, + stdin: Mutex, + stderr: Mutex, + /// Counter to separate same sized walredo inputs failing at the same millisecond. + #[cfg(feature = "testing")] + dump_sequence: AtomicUsize, +} + +impl WalRedoProcess { // // Start postgres binary in special WAL redo mode. // - #[instrument(skip_all,fields(tenant_id=%self.tenant_id, pg_version=pg_version))] + #[instrument(skip_all,fields(tenant_id=%tenant_id, pg_version=pg_version))] fn launch( - &self, - input: &mut MutexGuard>, + conf: &'static PageServerConf, + tenant_id: TenantId, pg_version: u32, - ) -> Result<(), Error> { - let pg_bin_dir_path = self - .conf - .pg_bin_dir(pg_version) - .map_err(|e| Error::new(ErrorKind::Other, format!("incorrect pg_bin_dir path: {e}")))?; - let pg_lib_dir_path = self - .conf - .pg_lib_dir(pg_version) - .map_err(|e| Error::new(ErrorKind::Other, format!("incorrect pg_lib_dir path: {e}")))?; + ) -> anyhow::Result { + let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible. + let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?; // Start postgres itself let child = Command::new(pg_bin_dir_path.join("postgres")) @@ -652,13 +678,8 @@ impl PostgresRedoManager { // as close-on-exec by default, but that's not enough, since we use // libraries that directly call libc open without setting that flag. .close_fds() - .spawn_no_leak_child(self.tenant_id) - .map_err(|e| { - Error::new( - e.kind(), - format!("postgres --wal-redo command failed to start: {}", e), - ) - })?; + .spawn_no_leak_child(tenant_id) + .context("spawn process")?; let mut child = scopeguard::guard(child, |child| { error!("killing wal-redo-postgres process due to a problem during launch"); @@ -685,36 +706,47 @@ impl PostgresRedoManager { // all fallible operations post-spawn are complete, so get rid of the guard let child = scopeguard::ScopeGuard::into_inner(child); - **input = Some(ProcessInput { - child, - stdout_fd: stdout.as_raw_fd(), - stderr_fd: stderr.as_raw_fd(), - stdin, - n_requests: 0, - }); + Ok(Self { + conf, + tenant_id, + child: Some(child), + stdin: Mutex::new(ProcessInput { + stdout_fd: stdout.as_raw_fd(), + stderr_fd: stderr.as_raw_fd(), + stdin, + n_requests: 0, + }), + stdout: Mutex::new(ProcessOutput { + stdout, + pending_responses: VecDeque::new(), + n_processed_responses: 0, + }), + stderr: Mutex::new(stderr), + #[cfg(feature = "testing")] + dump_sequence: AtomicUsize::default(), + }) + } - *self.stdout.lock().unwrap() = Some(ProcessOutput { - stdout, - pending_responses: VecDeque::new(), - n_processed_responses: 0, - }); - *self.stderr.lock().unwrap() = Some(stderr); - - Ok(()) + fn id(&self) -> u32 { + self.child + .as_ref() + .expect("must not call this during Drop") + .id() } // Apply given WAL records ('records') over an old page image. Returns // new page image. // - #[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%input.as_ref().unwrap().child.id()))] + #[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%self.id()))] fn apply_wal_records( &self, - input: MutexGuard>, tag: BufferTag, base_img: &Option, records: &[(Lsn, NeonWalRecord)], wal_redo_timeout: Duration, ) -> anyhow::Result { + let input = self.stdin.lock().unwrap(); + // Serialize all the messages to send the WAL redo process first. // // This could be problematic if there are millions of records to replay, @@ -757,18 +789,17 @@ impl PostgresRedoManager { fn apply_wal_records0( &self, writebuf: &[u8], - mut input: MutexGuard>, + input: MutexGuard, wal_redo_timeout: Duration, ) -> anyhow::Result { - let proc = input.as_mut().unwrap(); + let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small. let mut nwrite = 0usize; - let stdout_fd = proc.stdout_fd; // Prepare for calling poll() let mut pollfds = [ PollFd::new(proc.stdin.as_raw_fd(), PollFlags::POLLOUT), PollFd::new(proc.stderr_fd, PollFlags::POLLIN), - PollFd::new(stdout_fd, PollFlags::POLLIN), + PollFd::new(proc.stdout_fd, PollFlags::POLLIN), ]; // We do two things simultaneously: send the old base image and WAL records to @@ -790,8 +821,7 @@ impl PostgresRedoManager { let err_revents = pollfds[1].revents().unwrap(); if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() { let mut errbuf: [u8; 16384] = [0; 16384]; - let mut stderr_guard = self.stderr.lock().unwrap(); - let stderr = stderr_guard.as_mut().unwrap(); + let mut stderr = self.stderr.lock().unwrap(); let len = stderr.read(&mut errbuf)?; // The message might not be split correctly into lines here. But this is @@ -821,7 +851,7 @@ impl PostgresRedoManager { } let request_no = proc.n_requests; proc.n_requests += 1; - drop(input); + drop(proc); // To improve walredo performance we separate sending requests and receiving // responses. Them are protected by different mutexes (output and input). @@ -835,20 +865,7 @@ impl PostgresRedoManager { // pending responses ring buffer and truncate all empty elements from the front, // advancing processed responses number. - let mut output_guard = self.stdout.lock().unwrap(); - let output = output_guard.as_mut().unwrap(); - if output.stdout.as_raw_fd() != stdout_fd { - // If stdout file descriptor is changed then it means that walredo process is crashed and restarted. - // As far as ProcessInput and ProcessOutout are protected by different mutexes, - // it can happen that we send request to one process and waiting response from another. - // To prevent such situation we compare stdout file descriptors. - // As far as old stdout pipe is destroyed only after new one is created, - // it can not reuse the same file descriptor, so this check is safe. - // - // Cross-read this with the comment in apply_batch_postgres if result.is_err(). - // That's where we kill the child process. - anyhow::bail!("WAL redo process closed its stdout unexpectedly"); - } + let mut output = self.stdout.lock().unwrap(); let n_processed_responses = output.n_processed_responses; while n_processed_responses + output.pending_responses.len() <= request_no { // We expect the WAL redo process to respond with an 8k page image. We read it @@ -873,8 +890,7 @@ impl PostgresRedoManager { let err_revents = pollfds[1].revents().unwrap(); if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() { let mut errbuf: [u8; 16384] = [0; 16384]; - let mut stderr_guard = self.stderr.lock().unwrap(); - let stderr = stderr_guard.as_mut().unwrap(); + let mut stderr = self.stderr.lock().unwrap(); let len = stderr.read(&mut errbuf)?; // The message might not be split correctly into lines here. But this is @@ -984,6 +1000,15 @@ impl PostgresRedoManager { fn record_and_log(&self, _: &[u8]) {} } +impl Drop for WalRedoProcess { + fn drop(&mut self) { + self.child + .take() + .expect("we only do this once") + .kill_and_wait(); + } +} + /// Wrapper type around `std::process::Child` which guarantees that the child /// will be killed and waited-for by this process before being dropped. struct NoLeakChild {