use self::no_leak_child::NoLeakChild; use crate::{ config::PageServerConf, metrics::{WalRedoKillCause, WAL_REDO_PROCESS_COUNTERS, WAL_REDO_RECORD_COUNTER}, walrecord::NeonWalRecord, walredo::process::{no_leak_child, protocol} }; use anyhow::Context; use bytes::Bytes; use nix::poll::{PollFd, PollFlags}; use pageserver_api::{reltag::RelTag, shard::TenantShardId}; use postgres_ffi::BLCKSZ; use std::os::fd::AsRawFd; #[cfg(feature = "testing")] use std::sync::atomic::AtomicUsize; use std::{ collections::VecDeque, io::{Read, Write}, process::{ChildStdin, ChildStdout, Command, Stdio}, sync::{Mutex, MutexGuard}, time::Duration, }; use tracing::{debug, error, instrument, Instrument}; use utils::{lsn::Lsn, nonblock::set_nonblock}; pub struct WalRedoProcess { #[allow(dead_code)] conf: &'static PageServerConf, tenant_shard_id: TenantShardId, // Some() on construction, only becomes None on Drop. child: Option, stdout: Mutex, stdin: Mutex, /// Counter to separate same sized walredo inputs failing at the same millisecond. #[cfg(feature = "testing")] dump_sequence: AtomicUsize, } struct ProcessInput { stdin: ChildStdin, n_requests: usize, } struct ProcessOutput { stdout: ChildStdout, pending_responses: VecDeque>, n_processed_responses: usize, } impl WalRedoProcess { // // Start postgres binary in special WAL redo mode. // #[instrument(skip_all,fields(pg_version=pg_version))] pub(crate) fn launch( conf: &'static PageServerConf, tenant_shard_id: TenantShardId, pg_version: u32, ) -> anyhow::Result { crate::span::debug_assert_current_span_has_tenant_id(); let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible. let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?; use no_leak_child::NoLeakChildCommandExt; // Start postgres itself let child = Command::new(pg_bin_dir_path.join("postgres")) // the first arg must be --wal-redo so the child process enters into walredo mode .arg("--wal-redo") // the child doesn't process this arg, but, having it in the argv helps indentify the // walredo process for a particular tenant when debugging a pagserver .args(["--tenant-shard-id", &format!("{tenant_shard_id}")]) .stdin(Stdio::piped()) .stderr(Stdio::piped()) .stdout(Stdio::piped()) .env_clear() .env("LD_LIBRARY_PATH", &pg_lib_dir_path) .env("DYLD_LIBRARY_PATH", &pg_lib_dir_path) // NB: The redo process is not trusted after we sent it the first // walredo work. Before that, it is trusted. Specifically, we trust // it to // 1. close all file descriptors except stdin, stdout, stderr because // pageserver might not be 100% diligent in setting FD_CLOEXEC on all // the files it opens, and // 2. to use seccomp to sandbox itself before processing the first // walredo request. .spawn_no_leak_child(tenant_shard_id) .context("spawn process")?; WAL_REDO_PROCESS_COUNTERS.started.inc(); let mut child = scopeguard::guard(child, |child| { error!("killing wal-redo-postgres process due to a problem during launch"); child.kill_and_wait(WalRedoKillCause::Startup); }); let stdin = child.stdin.take().unwrap(); let stdout = child.stdout.take().unwrap(); let stderr = child.stderr.take().unwrap(); let stderr = tokio::process::ChildStderr::from_std(stderr) .context("convert to tokio::ChildStderr")?; macro_rules! set_nonblock_or_log_err { ($file:ident) => {{ let res = set_nonblock($file.as_raw_fd()); if let Err(e) = &res { error!(error = %e, file = stringify!($file), pid = child.id(), "set_nonblock failed"); } res }}; } set_nonblock_or_log_err!(stdin)?; set_nonblock_or_log_err!(stdout)?; // all fallible operations post-spawn are complete, so get rid of the guard let child = scopeguard::ScopeGuard::into_inner(child); tokio::spawn( async move { scopeguard::defer! { debug!("wal-redo-postgres stderr_logger_task finished"); crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_finished.inc(); } debug!("wal-redo-postgres stderr_logger_task started"); crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_started.inc(); use tokio::io::AsyncBufReadExt; let mut stderr_lines = tokio::io::BufReader::new(stderr); let mut buf = Vec::new(); let res = loop { buf.clear(); // TODO we don't trust the process to cap its stderr length. // Currently it can do unbounded Vec allocation. match stderr_lines.read_until(b'\n', &mut buf).await { Ok(0) => break Ok(()), // eof Ok(num_bytes) => { let output = String::from_utf8_lossy(&buf[..num_bytes]); error!(%output, "received output"); } Err(e) => { break Err(e); } } }; match res { Ok(()) => (), Err(e) => { error!(error=?e, "failed to read from walredo stderr"); } } }.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %pg_version)) ); Ok(Self { conf, tenant_shard_id, child: Some(child), stdin: Mutex::new(ProcessInput { stdin, n_requests: 0, }), stdout: Mutex::new(ProcessOutput { stdout, pending_responses: VecDeque::new(), n_processed_responses: 0, }), #[cfg(feature = "testing")] dump_sequence: AtomicUsize::default(), }) } pub(crate) fn id(&self) -> u32 { self.child .as_ref() .expect("must not call this during Drop") .id() } // Apply given WAL records ('records') over an old page image. Returns // new page image. // #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))] pub(crate) async fn apply_wal_records( &self, rel: RelTag, blknum: u32, base_img: &Option, records: &[(Lsn, NeonWalRecord)], wal_redo_timeout: Duration, ) -> anyhow::Result { let tag = protocol::BufferTag { rel, blknum }; let input = self.stdin.lock().unwrap(); // Serialize all the messages to send the WAL redo process first. // // This could be problematic if there are millions of records to replay, // but in practice the number of records is usually so small that it doesn't // matter, and it's better to keep this code simple. // // Most requests start with a before-image with BLCKSZ bytes, followed by // by some other WAL records. Start with a buffer that can hold that // comfortably. let mut writebuf: Vec = Vec::with_capacity((BLCKSZ as usize) * 3); protocol::build_begin_redo_for_block_msg(tag, &mut writebuf); if let Some(img) = base_img { protocol::build_push_page_msg(tag, img, &mut writebuf); } for (lsn, rec) in records.iter() { if let NeonWalRecord::Postgres { will_init: _, rec: postgres_rec, } = rec { protocol::build_apply_record_msg(*lsn, postgres_rec, &mut writebuf); } else { anyhow::bail!("tried to pass neon wal record to postgres WAL redo"); } } protocol::build_get_page_msg(tag, &mut writebuf); WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64); let res = self.apply_wal_records0(&writebuf, input, wal_redo_timeout); if res.is_err() { // not all of these can be caused by this particular input, however these are so rare // in tests so capture all. self.record_and_log(&writebuf); } res } fn apply_wal_records0( &self, writebuf: &[u8], input: MutexGuard, wal_redo_timeout: Duration, ) -> anyhow::Result { let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small. let mut nwrite = 0usize; while nwrite < writebuf.len() { let mut stdin_pollfds = [PollFd::new(&proc.stdin, PollFlags::POLLOUT)]; let n = loop { match nix::poll::poll(&mut stdin_pollfds[..], wal_redo_timeout.as_millis() as i32) { Err(nix::errno::Errno::EINTR) => continue, res => break res, } }?; if n == 0 { anyhow::bail!("WAL redo timed out"); } // If 'stdin' is writeable, do write. let in_revents = stdin_pollfds[0].revents().unwrap(); if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() { nwrite += proc.stdin.write(&writebuf[nwrite..])?; } if in_revents.contains(PollFlags::POLLHUP) { // We still have more data to write, but the process closed the pipe. anyhow::bail!("WAL redo process closed its stdin unexpectedly"); } } let request_no = proc.n_requests; proc.n_requests += 1; drop(proc); // To improve walredo performance we separate sending requests and receiving // responses. Them are protected by different mutexes (output and input). // If thread T1, T2, T3 send requests D1, D2, D3 to walredo process // then there is not warranty that T1 will first granted output mutex lock. // To address this issue we maintain number of sent requests, number of processed // responses and ring buffer with pending responses. After sending response // (under input mutex), threads remembers request number. Then it releases // input mutex, locks output mutex and fetch in ring buffer all responses until // its stored request number. The it takes correspondent element from // pending responses ring buffer and truncate all empty elements from the front, // advancing processed responses number. let mut output = self.stdout.lock().unwrap(); let n_processed_responses = output.n_processed_responses; while n_processed_responses + output.pending_responses.len() <= request_no { // We expect the WAL redo process to respond with an 8k page image. We read it // into this buffer. let mut resultbuf = vec![0; BLCKSZ.into()]; let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far while nresult < BLCKSZ.into() { let mut stdout_pollfds = [PollFd::new(&output.stdout, PollFlags::POLLIN)]; // We do two things simultaneously: reading response from stdout // and forward any logging information that the child writes to its stderr to the page server's log. let n = loop { match nix::poll::poll( &mut stdout_pollfds[..], wal_redo_timeout.as_millis() as i32, ) { Err(nix::errno::Errno::EINTR) => continue, res => break res, } }?; if n == 0 { anyhow::bail!("WAL redo timed out"); } // If we have some data in stdout, read it to the result buffer. let out_revents = stdout_pollfds[0].revents().unwrap(); if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() { nresult += output.stdout.read(&mut resultbuf[nresult..])?; } if out_revents.contains(PollFlags::POLLHUP) { anyhow::bail!("WAL redo process closed its stdout unexpectedly"); } } output .pending_responses .push_back(Some(Bytes::from(resultbuf))); } // Replace our request's response with None in `pending_responses`. // Then make space in the ring buffer by clearing out any seqence of contiguous // `None`'s from the front of `pending_responses`. // NB: We can't pop_front() because other requests' responses because another // requester might have grabbed the output mutex before us: // T1: grab input mutex // T1: send request_no 23 // T1: release input mutex // T2: grab input mutex // T2: send request_no 24 // T2: release input mutex // T2: grab output mutex // T2: n_processed_responses + output.pending_responses.len() <= request_no // 23 0 24 // T2: enters poll loop that reads stdout // T2: put response for 23 into pending_responses // T2: put response for 24 into pending_resposnes // pending_responses now looks like this: Front Some(response_23) Some(response_24) Back // T2: takes its response_24 // pending_responses now looks like this: Front Some(response_23) None Back // T2: does the while loop below // pending_responses now looks like this: Front Some(response_23) None Back // T2: releases output mutex // T1: grabs output mutex // T1: n_processed_responses + output.pending_responses.len() > request_no // 23 2 23 // T1: skips poll loop that reads stdout // T1: takes its response_23 // pending_responses now looks like this: Front None None Back // T2: does the while loop below // pending_responses now looks like this: Front Back // n_processed_responses now has value 25 let res = output.pending_responses[request_no - n_processed_responses] .take() .expect("we own this request_no, nobody else is supposed to take it"); while let Some(front) = output.pending_responses.front() { if front.is_none() { output.pending_responses.pop_front(); output.n_processed_responses += 1; } else { break; } } Ok(res) } #[cfg(feature = "testing")] fn record_and_log(&self, writebuf: &[u8]) { use std::sync::atomic::Ordering; let millis = std::time::SystemTime::now() .duration_since(std::time::SystemTime::UNIX_EPOCH) .unwrap() .as_millis(); let seq = self.dump_sequence.fetch_add(1, Ordering::Relaxed); // these files will be collected to an allure report let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len()); let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename); let res = std::fs::OpenOptions::new() .write(true) .create_new(true) .read(true) .open(path) .and_then(|mut f| f.write_all(writebuf)); // trip up allowed_errors if let Err(e) = res { tracing::error!(target=%filename, length=writebuf.len(), "failed to write out the walredo errored input: {e}"); } else { tracing::error!(filename, "erroring walredo input saved"); } } #[cfg(not(feature = "testing"))] fn record_and_log(&self, _: &[u8]) {} } impl Drop for WalRedoProcess { fn drop(&mut self) { self.child .take() .expect("we only do this once") .kill_and_wait(WalRedoKillCause::WalRedoProcessDrop); // no way to wait for stderr_logger_task from Drop because that is async only } }