diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 5bc897b730..773e5fc051 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -17,71 +17,30 @@ //! records. It achieves it by dropping privileges before replaying //! any WAL records, so that even if an attacker hijacks the Postgres //! process, he cannot escape out of it. -//! -use anyhow::Context; -use byteorder::{ByteOrder, LittleEndian}; -use bytes::{BufMut, Bytes, BytesMut}; -use nix::poll::*; -use pageserver_api::models::WalRedoManagerStatus; -use pageserver_api::shard::TenantShardId; -use serde::Serialize; -use std::collections::VecDeque; -use std::io; -use std::io::prelude::*; -use std::ops::{Deref, DerefMut}; -use std::os::unix::io::AsRawFd; -use std::process::Stdio; -use std::process::{Child, ChildStdin, ChildStdout, Command}; -use std::sync::{Arc, Mutex, MutexGuard, RwLock}; -use std::time::Duration; -use std::time::Instant; -use tracing::*; -use utils::{bin_ser::BeSer, lsn::Lsn, nonblock::set_nonblock}; -#[cfg(feature = "testing")] -use std::sync::atomic::{AtomicUsize, Ordering}; +/// Process lifecycle and abstracction for the IPC protocol. +mod process; + +/// Code to apply [`NeonWalRecord`]s. +mod apply_neon; use crate::config::PageServerConf; use crate::metrics::{ - WalRedoKillCause, WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_PROCESS_COUNTERS, - WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM, WAL_REDO_RECORDS_HISTOGRAM, - WAL_REDO_RECORD_COUNTER, WAL_REDO_TIME, + WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM, + WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_TIME, }; use crate::repository::Key; use crate::walrecord::NeonWalRecord; - -use pageserver_api::key::{key_to_rel_block, key_to_slru_block}; -use pageserver_api::reltag::{RelTag, SlruKind}; -use postgres_ffi::pg_constants; -use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM; -use postgres_ffi::v14::nonrelfile_utils::{ - mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset, - transaction_id_set_status, -}; -use postgres_ffi::BLCKSZ; - -/// -/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster. -/// -/// In Postgres `BufferTag` structure is used for exactly the same purpose. -/// [See more related comments here](https://github.com/postgres/postgres/blob/99c5852e20a0987eca1c38ba0c09329d4076b6a0/src/include/storage/buf_internals.h#L91). -/// -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Serialize)] -pub(crate) struct BufferTag { - pub rel: RelTag, - pub blknum: u32, -} - -struct ProcessInput { - stdin: ChildStdin, - n_requests: usize, -} - -struct ProcessOutput { - stdout: ChildStdout, - pending_responses: VecDeque>, - n_processed_responses: usize, -} +use anyhow::Context; +use bytes::{Bytes, BytesMut}; +use pageserver_api::key::key_to_rel_block; +use pageserver_api::models::WalRedoManagerStatus; +use pageserver_api::shard::TenantShardId; +use std::sync::{Arc, RwLock}; +use std::time::Duration; +use std::time::Instant; +use tracing::*; +use utils::lsn::Lsn; /// /// This is the real implementation that uses a Postgres process to @@ -94,22 +53,7 @@ pub struct PostgresRedoManager { tenant_shard_id: TenantShardId, conf: &'static PageServerConf, last_redo_at: std::sync::Mutex>, - redo_process: RwLock>>, -} - -/// Can this request be served by neon redo functions -/// or we need to pass it to wal-redo postgres process? -fn can_apply_in_neon(rec: &NeonWalRecord) -> bool { - // Currently, we don't have bespoken Rust code to replay any - // Postgres WAL records. But everything else is handled in neon. - #[allow(clippy::match_like_matches_macro)] - match rec { - NeonWalRecord::Postgres { - will_init: _, - rec: _, - } => false, - _ => true, - } + redo_process: RwLock>>, } /// @@ -139,10 +83,10 @@ impl PostgresRedoManager { let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID); let mut img = base_img.map(|p| p.1); - let mut batch_neon = can_apply_in_neon(&records[0].1); + let mut batch_neon = apply_neon::can_apply_in_neon(&records[0].1); let mut batch_start = 0; for (i, record) in records.iter().enumerate().skip(1) { - let rec_neon = can_apply_in_neon(&record.1); + let rec_neon = apply_neon::can_apply_in_neon(&record.1); if rec_neon != batch_neon { let result = if batch_neon { @@ -248,7 +192,7 @@ impl PostgresRedoManager { let mut n_attempts = 0u32; loop { // launch the WAL redo process on first use - let proc: Arc = { + let proc: Arc = { let proc_guard = self.redo_process.read().unwrap(); match &*proc_guard { None => { @@ -259,7 +203,7 @@ impl PostgresRedoManager { None => { let start = Instant::now(); let proc = Arc::new( - WalRedoProcess::launch( + process::WalRedoProcess::launch( self.conf, self.tenant_shard_id, pg_version, @@ -287,9 +231,8 @@ impl PostgresRedoManager { let started_at = std::time::Instant::now(); // Relational WAL records are applied using wal-redo-postgres - let buf_tag = BufferTag { rel, blknum }; let result = proc - .apply_wal_records(buf_tag, &base_img, records, wal_redo_timeout) + .apply_wal_records(rel, blknum, &base_img, records, wal_redo_timeout) .context("apply_wal_records"); let duration = started_at.elapsed(); @@ -416,732 +359,12 @@ impl PostgresRedoManager { _record_lsn: Lsn, record: &NeonWalRecord, ) -> anyhow::Result<()> { - match record { - NeonWalRecord::Postgres { - will_init: _, - rec: _, - } => { - anyhow::bail!("tried to pass postgres wal record to neon WAL redo"); - } - NeonWalRecord::ClearVisibilityMapFlags { - new_heap_blkno, - old_heap_blkno, - flags, - } => { - // sanity check that this is modifying the correct relation - let (rel, blknum) = key_to_rel_block(key).context("invalid record")?; - assert!( - rel.forknum == VISIBILITYMAP_FORKNUM, - "ClearVisibilityMapFlags record on unexpected rel {}", - rel - ); - if let Some(heap_blkno) = *new_heap_blkno { - // Calculate the VM block and offset that corresponds to the heap block. - let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno); - let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno); - let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno); - - // Check that we're modifying the correct VM block. - assert!(map_block == blknum); - - // equivalent to PageGetContents(page) - let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..]; - - map[map_byte as usize] &= !(flags << map_offset); - } - - // Repeat for 'old_heap_blkno', if any - if let Some(heap_blkno) = *old_heap_blkno { - let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno); - let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno); - let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno); - - assert!(map_block == blknum); - - let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..]; - - map[map_byte as usize] &= !(flags << map_offset); - } - } - // Non-relational WAL records are handled here, with custom code that has the - // same effects as the corresponding Postgres WAL redo function. - NeonWalRecord::ClogSetCommitted { xids, timestamp } => { - let (slru_kind, segno, blknum) = - key_to_slru_block(key).context("invalid record")?; - assert_eq!( - slru_kind, - SlruKind::Clog, - "ClogSetCommitted record with unexpected key {}", - key - ); - for &xid in xids { - let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE; - let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - - // Check that we're modifying the correct CLOG block. - assert!( - segno == expected_segno, - "ClogSetCommitted record for XID {} with unexpected key {}", - xid, - key - ); - assert!( - blknum == expected_blknum, - "ClogSetCommitted record for XID {} with unexpected key {}", - xid, - key - ); - - transaction_id_set_status( - xid, - pg_constants::TRANSACTION_STATUS_COMMITTED, - page, - ); - } - - // Append the timestamp - if page.len() == BLCKSZ as usize + 8 { - page.truncate(BLCKSZ as usize); - } - if page.len() == BLCKSZ as usize { - page.extend_from_slice(×tamp.to_be_bytes()); - } else { - warn!( - "CLOG blk {} in seg {} has invalid size {}", - blknum, - segno, - page.len() - ); - } - } - NeonWalRecord::ClogSetAborted { xids } => { - let (slru_kind, segno, blknum) = - key_to_slru_block(key).context("invalid record")?; - assert_eq!( - slru_kind, - SlruKind::Clog, - "ClogSetAborted record with unexpected key {}", - key - ); - for &xid in xids { - let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE; - let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - - // Check that we're modifying the correct CLOG block. - assert!( - segno == expected_segno, - "ClogSetAborted record for XID {} with unexpected key {}", - xid, - key - ); - assert!( - blknum == expected_blknum, - "ClogSetAborted record for XID {} with unexpected key {}", - xid, - key - ); - - transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_ABORTED, page); - } - } - NeonWalRecord::MultixactOffsetCreate { mid, moff } => { - let (slru_kind, segno, blknum) = - key_to_slru_block(key).context("invalid record")?; - assert_eq!( - slru_kind, - SlruKind::MultiXactOffsets, - "MultixactOffsetCreate record with unexpected key {}", - key - ); - // Compute the block and offset to modify. - // See RecordNewMultiXact in PostgreSQL sources. - let pageno = mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; - let entryno = mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; - let offset = (entryno * 4) as usize; - - // Check that we're modifying the correct multixact-offsets block. - let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - assert!( - segno == expected_segno, - "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}", - mid, - key - ); - assert!( - blknum == expected_blknum, - "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}", - mid, - key - ); - - LittleEndian::write_u32(&mut page[offset..offset + 4], *moff); - } - NeonWalRecord::MultixactMembersCreate { moff, members } => { - let (slru_kind, segno, blknum) = - key_to_slru_block(key).context("invalid record")?; - assert_eq!( - slru_kind, - SlruKind::MultiXactMembers, - "MultixactMembersCreate record with unexpected key {}", - key - ); - for (i, member) in members.iter().enumerate() { - let offset = moff + i as u32; - - // Compute the block and offset to modify. - // See RecordNewMultiXact in PostgreSQL sources. - let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; - let memberoff = mx_offset_to_member_offset(offset); - let flagsoff = mx_offset_to_flags_offset(offset); - let bshift = mx_offset_to_flags_bitshift(offset); - - // Check that we're modifying the correct multixact-members block. - let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - assert!( - segno == expected_segno, - "MultiXactMembersCreate record for offset {} with unexpected key {}", - moff, - key - ); - assert!( - blknum == expected_blknum, - "MultiXactMembersCreate record for offset {} with unexpected key {}", - moff, - key - ); - - let mut flagsval = LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]); - flagsval &= !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) << bshift); - flagsval |= member.status << bshift; - LittleEndian::write_u32(&mut page[flagsoff..flagsoff + 4], flagsval); - LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid); - } - } - } + apply_neon::apply_in_neon(record, key, page)?; Ok(()) } } -struct WalRedoProcess { - #[allow(dead_code)] - conf: &'static PageServerConf, - tenant_shard_id: TenantShardId, - // Some() on construction, only becomes None on Drop. - child: Option, - stdout: Mutex, - stdin: Mutex, - /// Counter to separate same sized walredo inputs failing at the same millisecond. - #[cfg(feature = "testing")] - dump_sequence: AtomicUsize, -} - -impl WalRedoProcess { - // - // Start postgres binary in special WAL redo mode. - // - #[instrument(skip_all,fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), pg_version=pg_version))] - fn launch( - conf: &'static PageServerConf, - tenant_shard_id: TenantShardId, - pg_version: u32, - ) -> anyhow::Result { - let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible. - let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?; - - // Start postgres itself - let child = Command::new(pg_bin_dir_path.join("postgres")) - // the first arg must be --wal-redo so the child process enters into walredo mode - .arg("--wal-redo") - // the child doesn't process this arg, but, having it in the argv helps indentify the - // walredo process for a particular tenant when debugging a pagserver - .args(["--tenant-shard-id", &format!("{tenant_shard_id}")]) - .stdin(Stdio::piped()) - .stderr(Stdio::piped()) - .stdout(Stdio::piped()) - .env_clear() - .env("LD_LIBRARY_PATH", &pg_lib_dir_path) - .env("DYLD_LIBRARY_PATH", &pg_lib_dir_path) - // NB: The redo process is not trusted after we sent it the first - // walredo work. Before that, it is trusted. Specifically, we trust - // it to - // 1. close all file descriptors except stdin, stdout, stderr because - // pageserver might not be 100% diligent in setting FD_CLOEXEC on all - // the files it opens, and - // 2. to use seccomp to sandbox itself before processing the first - // walredo request. - .spawn_no_leak_child(tenant_shard_id) - .context("spawn process")?; - WAL_REDO_PROCESS_COUNTERS.started.inc(); - let mut child = scopeguard::guard(child, |child| { - error!("killing wal-redo-postgres process due to a problem during launch"); - child.kill_and_wait(WalRedoKillCause::Startup); - }); - - let stdin = child.stdin.take().unwrap(); - let stdout = child.stdout.take().unwrap(); - let stderr = child.stderr.take().unwrap(); - let stderr = tokio::process::ChildStderr::from_std(stderr) - .context("convert to tokio::ChildStderr")?; - macro_rules! set_nonblock_or_log_err { - ($file:ident) => {{ - let res = set_nonblock($file.as_raw_fd()); - if let Err(e) = &res { - error!(error = %e, file = stringify!($file), pid = child.id(), "set_nonblock failed"); - } - res - }}; - } - set_nonblock_or_log_err!(stdin)?; - set_nonblock_or_log_err!(stdout)?; - - // all fallible operations post-spawn are complete, so get rid of the guard - let child = scopeguard::ScopeGuard::into_inner(child); - - tokio::spawn( - async move { - scopeguard::defer! { - debug!("wal-redo-postgres stderr_logger_task finished"); - crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_finished.inc(); - } - debug!("wal-redo-postgres stderr_logger_task started"); - crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_started.inc(); - - use tokio::io::AsyncBufReadExt; - let mut stderr_lines = tokio::io::BufReader::new(stderr); - let mut buf = Vec::new(); - let res = loop { - buf.clear(); - // TODO we don't trust the process to cap its stderr length. - // Currently it can do unbounded Vec allocation. - match stderr_lines.read_until(b'\n', &mut buf).await { - Ok(0) => break Ok(()), // eof - Ok(num_bytes) => { - let output = String::from_utf8_lossy(&buf[..num_bytes]); - error!(%output, "received output"); - } - Err(e) => { - break Err(e); - } - } - }; - match res { - Ok(()) => (), - Err(e) => { - error!(error=?e, "failed to read from walredo stderr"); - } - } - }.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %pg_version)) - ); - - Ok(Self { - conf, - tenant_shard_id, - child: Some(child), - stdin: Mutex::new(ProcessInput { - stdin, - n_requests: 0, - }), - stdout: Mutex::new(ProcessOutput { - stdout, - pending_responses: VecDeque::new(), - n_processed_responses: 0, - }), - #[cfg(feature = "testing")] - dump_sequence: AtomicUsize::default(), - }) - } - - fn id(&self) -> u32 { - self.child - .as_ref() - .expect("must not call this during Drop") - .id() - } - - // Apply given WAL records ('records') over an old page image. Returns - // new page image. - // - #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))] - fn apply_wal_records( - &self, - tag: BufferTag, - base_img: &Option, - records: &[(Lsn, NeonWalRecord)], - wal_redo_timeout: Duration, - ) -> anyhow::Result { - let input = self.stdin.lock().unwrap(); - - // Serialize all the messages to send the WAL redo process first. - // - // This could be problematic if there are millions of records to replay, - // but in practice the number of records is usually so small that it doesn't - // matter, and it's better to keep this code simple. - // - // Most requests start with a before-image with BLCKSZ bytes, followed by - // by some other WAL records. Start with a buffer that can hold that - // comfortably. - let mut writebuf: Vec = Vec::with_capacity((BLCKSZ as usize) * 3); - build_begin_redo_for_block_msg(tag, &mut writebuf); - if let Some(img) = base_img { - build_push_page_msg(tag, img, &mut writebuf); - } - for (lsn, rec) in records.iter() { - if let NeonWalRecord::Postgres { - will_init: _, - rec: postgres_rec, - } = rec - { - build_apply_record_msg(*lsn, postgres_rec, &mut writebuf); - } else { - anyhow::bail!("tried to pass neon wal record to postgres WAL redo"); - } - } - build_get_page_msg(tag, &mut writebuf); - WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64); - - let res = self.apply_wal_records0(&writebuf, input, wal_redo_timeout); - - if res.is_err() { - // not all of these can be caused by this particular input, however these are so rare - // in tests so capture all. - self.record_and_log(&writebuf); - } - - res - } - - fn apply_wal_records0( - &self, - writebuf: &[u8], - input: MutexGuard, - wal_redo_timeout: Duration, - ) -> anyhow::Result { - let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small. - let mut nwrite = 0usize; - - while nwrite < writebuf.len() { - let mut stdin_pollfds = [PollFd::new(&proc.stdin, PollFlags::POLLOUT)]; - let n = loop { - match nix::poll::poll(&mut stdin_pollfds[..], wal_redo_timeout.as_millis() as i32) { - Err(nix::errno::Errno::EINTR) => continue, - res => break res, - } - }?; - - if n == 0 { - anyhow::bail!("WAL redo timed out"); - } - - // If 'stdin' is writeable, do write. - let in_revents = stdin_pollfds[0].revents().unwrap(); - if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() { - nwrite += proc.stdin.write(&writebuf[nwrite..])?; - } - if in_revents.contains(PollFlags::POLLHUP) { - // We still have more data to write, but the process closed the pipe. - anyhow::bail!("WAL redo process closed its stdin unexpectedly"); - } - } - let request_no = proc.n_requests; - proc.n_requests += 1; - drop(proc); - - // To improve walredo performance we separate sending requests and receiving - // responses. Them are protected by different mutexes (output and input). - // If thread T1, T2, T3 send requests D1, D2, D3 to walredo process - // then there is not warranty that T1 will first granted output mutex lock. - // To address this issue we maintain number of sent requests, number of processed - // responses and ring buffer with pending responses. After sending response - // (under input mutex), threads remembers request number. Then it releases - // input mutex, locks output mutex and fetch in ring buffer all responses until - // its stored request number. The it takes correspondent element from - // pending responses ring buffer and truncate all empty elements from the front, - // advancing processed responses number. - - let mut output = self.stdout.lock().unwrap(); - let n_processed_responses = output.n_processed_responses; - while n_processed_responses + output.pending_responses.len() <= request_no { - // We expect the WAL redo process to respond with an 8k page image. We read it - // into this buffer. - let mut resultbuf = vec![0; BLCKSZ.into()]; - let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far - while nresult < BLCKSZ.into() { - let mut stdout_pollfds = [PollFd::new(&output.stdout, PollFlags::POLLIN)]; - // We do two things simultaneously: reading response from stdout - // and forward any logging information that the child writes to its stderr to the page server's log. - let n = loop { - match nix::poll::poll( - &mut stdout_pollfds[..], - wal_redo_timeout.as_millis() as i32, - ) { - Err(nix::errno::Errno::EINTR) => continue, - res => break res, - } - }?; - - if n == 0 { - anyhow::bail!("WAL redo timed out"); - } - - // If we have some data in stdout, read it to the result buffer. - let out_revents = stdout_pollfds[0].revents().unwrap(); - if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() { - nresult += output.stdout.read(&mut resultbuf[nresult..])?; - } - if out_revents.contains(PollFlags::POLLHUP) { - anyhow::bail!("WAL redo process closed its stdout unexpectedly"); - } - } - output - .pending_responses - .push_back(Some(Bytes::from(resultbuf))); - } - // Replace our request's response with None in `pending_responses`. - // Then make space in the ring buffer by clearing out any seqence of contiguous - // `None`'s from the front of `pending_responses`. - // NB: We can't pop_front() because other requests' responses because another - // requester might have grabbed the output mutex before us: - // T1: grab input mutex - // T1: send request_no 23 - // T1: release input mutex - // T2: grab input mutex - // T2: send request_no 24 - // T2: release input mutex - // T2: grab output mutex - // T2: n_processed_responses + output.pending_responses.len() <= request_no - // 23 0 24 - // T2: enters poll loop that reads stdout - // T2: put response for 23 into pending_responses - // T2: put response for 24 into pending_resposnes - // pending_responses now looks like this: Front Some(response_23) Some(response_24) Back - // T2: takes its response_24 - // pending_responses now looks like this: Front Some(response_23) None Back - // T2: does the while loop below - // pending_responses now looks like this: Front Some(response_23) None Back - // T2: releases output mutex - // T1: grabs output mutex - // T1: n_processed_responses + output.pending_responses.len() > request_no - // 23 2 23 - // T1: skips poll loop that reads stdout - // T1: takes its response_23 - // pending_responses now looks like this: Front None None Back - // T2: does the while loop below - // pending_responses now looks like this: Front Back - // n_processed_responses now has value 25 - let res = output.pending_responses[request_no - n_processed_responses] - .take() - .expect("we own this request_no, nobody else is supposed to take it"); - while let Some(front) = output.pending_responses.front() { - if front.is_none() { - output.pending_responses.pop_front(); - output.n_processed_responses += 1; - } else { - break; - } - } - Ok(res) - } - - #[cfg(feature = "testing")] - fn record_and_log(&self, writebuf: &[u8]) { - let millis = std::time::SystemTime::now() - .duration_since(std::time::SystemTime::UNIX_EPOCH) - .unwrap() - .as_millis(); - - let seq = self.dump_sequence.fetch_add(1, Ordering::Relaxed); - - // these files will be collected to an allure report - let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len()); - - let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename); - - let res = std::fs::OpenOptions::new() - .write(true) - .create_new(true) - .read(true) - .open(path) - .and_then(|mut f| f.write_all(writebuf)); - - // trip up allowed_errors - if let Err(e) = res { - tracing::error!(target=%filename, length=writebuf.len(), "failed to write out the walredo errored input: {e}"); - } else { - tracing::error!(filename, "erroring walredo input saved"); - } - } - - #[cfg(not(feature = "testing"))] - fn record_and_log(&self, _: &[u8]) {} -} - -impl Drop for WalRedoProcess { - fn drop(&mut self) { - self.child - .take() - .expect("we only do this once") - .kill_and_wait(WalRedoKillCause::WalRedoProcessDrop); - // no way to wait for stderr_logger_task from Drop because that is async only - } -} - -/// Wrapper type around `std::process::Child` which guarantees that the child -/// will be killed and waited-for by this process before being dropped. -struct NoLeakChild { - tenant_id: TenantShardId, - child: Option, -} - -impl Deref for NoLeakChild { - type Target = Child; - - fn deref(&self) -> &Self::Target { - self.child.as_ref().expect("must not use from drop") - } -} - -impl DerefMut for NoLeakChild { - fn deref_mut(&mut self) -> &mut Self::Target { - self.child.as_mut().expect("must not use from drop") - } -} - -impl NoLeakChild { - fn spawn(tenant_id: TenantShardId, command: &mut Command) -> io::Result { - let child = command.spawn()?; - Ok(NoLeakChild { - tenant_id, - child: Some(child), - }) - } - - fn kill_and_wait(mut self, cause: WalRedoKillCause) { - let child = match self.child.take() { - Some(child) => child, - None => return, - }; - Self::kill_and_wait_impl(child, cause); - } - - #[instrument(skip_all, fields(pid=child.id(), ?cause))] - fn kill_and_wait_impl(mut child: Child, cause: WalRedoKillCause) { - scopeguard::defer! { - WAL_REDO_PROCESS_COUNTERS.killed_by_cause[cause].inc(); - } - let res = child.kill(); - if let Err(e) = res { - // This branch is very unlikely because: - // - We (= pageserver) spawned this process successfully, so, we're allowed to kill it. - // - This is the only place that calls .kill() - // - We consume `self`, so, .kill() can't be called twice. - // - If the process exited by itself or was killed by someone else, - // .kill() will still succeed because we haven't wait()'ed yet. - // - // So, if we arrive here, we have really no idea what happened, - // whether the PID stored in self.child is still valid, etc. - // If this function were fallible, we'd return an error, but - // since it isn't, all we can do is log an error and proceed - // with the wait(). - error!(error = %e, "failed to SIGKILL; subsequent wait() might fail or wait for wrong process"); - } - - match child.wait() { - Ok(exit_status) => { - info!(exit_status = %exit_status, "wait successful"); - } - Err(e) => { - error!(error = %e, "wait error; might leak the child process; it will show as zombie (defunct)"); - } - } - } -} - -impl Drop for NoLeakChild { - fn drop(&mut self) { - let child = match self.child.take() { - Some(child) => child, - None => return, - }; - let tenant_shard_id = self.tenant_id; - // Offload the kill+wait of the child process into the background. - // If someone stops the runtime, we'll leak the child process. - // We can ignore that case because we only stop the runtime on pageserver exit. - tokio::runtime::Handle::current().spawn(async move { - tokio::task::spawn_blocking(move || { - // Intentionally don't inherit the tracing context from whoever is dropping us. - // This thread here is going to outlive of our dropper. - let span = tracing::info_span!( - "walredo", - tenant_id = %tenant_shard_id.tenant_id, - shard_id = %tenant_shard_id.shard_slug() - ); - let _entered = span.enter(); - Self::kill_and_wait_impl(child, WalRedoKillCause::NoLeakChildDrop); - }) - .await - }); - } -} - -trait NoLeakChildCommandExt { - fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result; -} - -impl NoLeakChildCommandExt for Command { - fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result { - NoLeakChild::spawn(tenant_id, self) - } -} - -// Functions for constructing messages to send to the postgres WAL redo -// process. See pgxn/neon_walredo/walredoproc.c for -// explanation of the protocol. - -fn build_begin_redo_for_block_msg(tag: BufferTag, buf: &mut Vec) { - let len = 4 + 1 + 4 * 4; - - buf.put_u8(b'B'); - buf.put_u32(len as u32); - - tag.ser_into(buf) - .expect("serialize BufferTag should always succeed"); -} - -fn build_push_page_msg(tag: BufferTag, base_img: &[u8], buf: &mut Vec) { - assert!(base_img.len() == 8192); - - let len = 4 + 1 + 4 * 4 + base_img.len(); - - buf.put_u8(b'P'); - buf.put_u32(len as u32); - tag.ser_into(buf) - .expect("serialize BufferTag should always succeed"); - buf.put(base_img); -} - -fn build_apply_record_msg(endlsn: Lsn, rec: &[u8], buf: &mut Vec) { - let len = 4 + 8 + rec.len(); - - buf.put_u8(b'A'); - buf.put_u32(len as u32); - buf.put_u64(endlsn.0); - buf.put(rec); -} - -fn build_get_page_msg(tag: BufferTag, buf: &mut Vec) { - let len = 4 + 1 + 4 * 4; - - buf.put_u8(b'G'); - buf.put_u32(len as u32); - tag.ser_into(buf) - .expect("serialize BufferTag should always succeed"); -} - #[cfg(test)] mod tests { use super::PostgresRedoManager; diff --git a/pageserver/src/walredo/apply_neon.rs b/pageserver/src/walredo/apply_neon.rs new file mode 100644 index 0000000000..52899349c4 --- /dev/null +++ b/pageserver/src/walredo/apply_neon.rs @@ -0,0 +1,235 @@ +use crate::walrecord::NeonWalRecord; +use anyhow::Context; +use byteorder::{ByteOrder, LittleEndian}; +use bytes::BytesMut; +use pageserver_api::key::{key_to_rel_block, key_to_slru_block, Key}; +use pageserver_api::reltag::SlruKind; +use postgres_ffi::pg_constants; +use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM; +use postgres_ffi::v14::nonrelfile_utils::{ + mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset, + transaction_id_set_status, +}; +use postgres_ffi::BLCKSZ; +use tracing::*; + +/// Can this request be served by neon redo functions +/// or we need to pass it to wal-redo postgres process? +pub(crate) fn can_apply_in_neon(rec: &NeonWalRecord) -> bool { + // Currently, we don't have bespoken Rust code to replay any + // Postgres WAL records. But everything else is handled in neon. + #[allow(clippy::match_like_matches_macro)] + match rec { + NeonWalRecord::Postgres { + will_init: _, + rec: _, + } => false, + _ => true, + } +} + +pub(crate) fn apply_in_neon( + record: &NeonWalRecord, + key: Key, + page: &mut BytesMut, +) -> Result<(), anyhow::Error> { + match record { + NeonWalRecord::Postgres { + will_init: _, + rec: _, + } => { + anyhow::bail!("tried to pass postgres wal record to neon WAL redo"); + } + NeonWalRecord::ClearVisibilityMapFlags { + new_heap_blkno, + old_heap_blkno, + flags, + } => { + // sanity check that this is modifying the correct relation + let (rel, blknum) = key_to_rel_block(key).context("invalid record")?; + assert!( + rel.forknum == VISIBILITYMAP_FORKNUM, + "ClearVisibilityMapFlags record on unexpected rel {}", + rel + ); + if let Some(heap_blkno) = *new_heap_blkno { + // Calculate the VM block and offset that corresponds to the heap block. + let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno); + let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno); + let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno); + + // Check that we're modifying the correct VM block. + assert!(map_block == blknum); + + // equivalent to PageGetContents(page) + let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..]; + + map[map_byte as usize] &= !(flags << map_offset); + } + + // Repeat for 'old_heap_blkno', if any + if let Some(heap_blkno) = *old_heap_blkno { + let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno); + let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno); + let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno); + + assert!(map_block == blknum); + + let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..]; + + map[map_byte as usize] &= !(flags << map_offset); + } + } + // Non-relational WAL records are handled here, with custom code that has the + // same effects as the corresponding Postgres WAL redo function. + NeonWalRecord::ClogSetCommitted { xids, timestamp } => { + let (slru_kind, segno, blknum) = key_to_slru_block(key).context("invalid record")?; + assert_eq!( + slru_kind, + SlruKind::Clog, + "ClogSetCommitted record with unexpected key {}", + key + ); + for &xid in xids { + let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE; + let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + + // Check that we're modifying the correct CLOG block. + assert!( + segno == expected_segno, + "ClogSetCommitted record for XID {} with unexpected key {}", + xid, + key + ); + assert!( + blknum == expected_blknum, + "ClogSetCommitted record for XID {} with unexpected key {}", + xid, + key + ); + + transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_COMMITTED, page); + } + + // Append the timestamp + if page.len() == BLCKSZ as usize + 8 { + page.truncate(BLCKSZ as usize); + } + if page.len() == BLCKSZ as usize { + page.extend_from_slice(×tamp.to_be_bytes()); + } else { + warn!( + "CLOG blk {} in seg {} has invalid size {}", + blknum, + segno, + page.len() + ); + } + } + NeonWalRecord::ClogSetAborted { xids } => { + let (slru_kind, segno, blknum) = key_to_slru_block(key).context("invalid record")?; + assert_eq!( + slru_kind, + SlruKind::Clog, + "ClogSetAborted record with unexpected key {}", + key + ); + for &xid in xids { + let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE; + let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + + // Check that we're modifying the correct CLOG block. + assert!( + segno == expected_segno, + "ClogSetAborted record for XID {} with unexpected key {}", + xid, + key + ); + assert!( + blknum == expected_blknum, + "ClogSetAborted record for XID {} with unexpected key {}", + xid, + key + ); + + transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_ABORTED, page); + } + } + NeonWalRecord::MultixactOffsetCreate { mid, moff } => { + let (slru_kind, segno, blknum) = key_to_slru_block(key).context("invalid record")?; + assert_eq!( + slru_kind, + SlruKind::MultiXactOffsets, + "MultixactOffsetCreate record with unexpected key {}", + key + ); + // Compute the block and offset to modify. + // See RecordNewMultiXact in PostgreSQL sources. + let pageno = mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; + let entryno = mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; + let offset = (entryno * 4) as usize; + + // Check that we're modifying the correct multixact-offsets block. + let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + assert!( + segno == expected_segno, + "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}", + mid, + key + ); + assert!( + blknum == expected_blknum, + "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}", + mid, + key + ); + + LittleEndian::write_u32(&mut page[offset..offset + 4], *moff); + } + NeonWalRecord::MultixactMembersCreate { moff, members } => { + let (slru_kind, segno, blknum) = key_to_slru_block(key).context("invalid record")?; + assert_eq!( + slru_kind, + SlruKind::MultiXactMembers, + "MultixactMembersCreate record with unexpected key {}", + key + ); + for (i, member) in members.iter().enumerate() { + let offset = moff + i as u32; + + // Compute the block and offset to modify. + // See RecordNewMultiXact in PostgreSQL sources. + let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + let memberoff = mx_offset_to_member_offset(offset); + let flagsoff = mx_offset_to_flags_offset(offset); + let bshift = mx_offset_to_flags_bitshift(offset); + + // Check that we're modifying the correct multixact-members block. + let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + assert!( + segno == expected_segno, + "MultiXactMembersCreate record for offset {} with unexpected key {}", + moff, + key + ); + assert!( + blknum == expected_blknum, + "MultiXactMembersCreate record for offset {} with unexpected key {}", + moff, + key + ); + + let mut flagsval = LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]); + flagsval &= !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) << bshift); + flagsval |= member.status << bshift; + LittleEndian::write_u32(&mut page[flagsoff..flagsoff + 4], flagsval); + LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid); + } + } + } + Ok(()) +} diff --git a/pageserver/src/walredo/process.rs b/pageserver/src/walredo/process.rs new file mode 100644 index 0000000000..85db3b4a4a --- /dev/null +++ b/pageserver/src/walredo/process.rs @@ -0,0 +1,406 @@ +use self::no_leak_child::NoLeakChild; +use crate::{ + config::PageServerConf, + metrics::{WalRedoKillCause, WAL_REDO_PROCESS_COUNTERS, WAL_REDO_RECORD_COUNTER}, + walrecord::NeonWalRecord, +}; +use anyhow::Context; +use bytes::Bytes; +use nix::poll::{PollFd, PollFlags}; +use pageserver_api::{reltag::RelTag, shard::TenantShardId}; +use postgres_ffi::BLCKSZ; +use std::os::fd::AsRawFd; +#[cfg(feature = "testing")] +use std::sync::atomic::AtomicUsize; +use std::{ + collections::VecDeque, + io::{Read, Write}, + process::{ChildStdin, ChildStdout, Command, Stdio}, + sync::{Mutex, MutexGuard}, + time::Duration, +}; +use tracing::{debug, error, instrument, Instrument}; +use utils::{lsn::Lsn, nonblock::set_nonblock}; + +mod no_leak_child; +/// The IPC protocol that pageserver and walredo process speak over their shared pipe. +mod protocol; + +pub struct WalRedoProcess { + #[allow(dead_code)] + conf: &'static PageServerConf, + tenant_shard_id: TenantShardId, + // Some() on construction, only becomes None on Drop. + child: Option, + stdout: Mutex, + stdin: Mutex, + /// Counter to separate same sized walredo inputs failing at the same millisecond. + #[cfg(feature = "testing")] + dump_sequence: AtomicUsize, +} + +struct ProcessInput { + stdin: ChildStdin, + n_requests: usize, +} + +struct ProcessOutput { + stdout: ChildStdout, + pending_responses: VecDeque>, + n_processed_responses: usize, +} + +impl WalRedoProcess { + // + // Start postgres binary in special WAL redo mode. + // + #[instrument(skip_all,fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), pg_version=pg_version))] + pub(crate) fn launch( + conf: &'static PageServerConf, + tenant_shard_id: TenantShardId, + pg_version: u32, + ) -> anyhow::Result { + let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible. + let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?; + + use no_leak_child::NoLeakChildCommandExt; + // Start postgres itself + let child = Command::new(pg_bin_dir_path.join("postgres")) + // the first arg must be --wal-redo so the child process enters into walredo mode + .arg("--wal-redo") + // the child doesn't process this arg, but, having it in the argv helps indentify the + // walredo process for a particular tenant when debugging a pagserver + .args(["--tenant-shard-id", &format!("{tenant_shard_id}")]) + .stdin(Stdio::piped()) + .stderr(Stdio::piped()) + .stdout(Stdio::piped()) + .env_clear() + .env("LD_LIBRARY_PATH", &pg_lib_dir_path) + .env("DYLD_LIBRARY_PATH", &pg_lib_dir_path) + // NB: The redo process is not trusted after we sent it the first + // walredo work. Before that, it is trusted. Specifically, we trust + // it to + // 1. close all file descriptors except stdin, stdout, stderr because + // pageserver might not be 100% diligent in setting FD_CLOEXEC on all + // the files it opens, and + // 2. to use seccomp to sandbox itself before processing the first + // walredo request. + .spawn_no_leak_child(tenant_shard_id) + .context("spawn process")?; + WAL_REDO_PROCESS_COUNTERS.started.inc(); + let mut child = scopeguard::guard(child, |child| { + error!("killing wal-redo-postgres process due to a problem during launch"); + child.kill_and_wait(WalRedoKillCause::Startup); + }); + + let stdin = child.stdin.take().unwrap(); + let stdout = child.stdout.take().unwrap(); + let stderr = child.stderr.take().unwrap(); + let stderr = tokio::process::ChildStderr::from_std(stderr) + .context("convert to tokio::ChildStderr")?; + macro_rules! set_nonblock_or_log_err { + ($file:ident) => {{ + let res = set_nonblock($file.as_raw_fd()); + if let Err(e) = &res { + error!(error = %e, file = stringify!($file), pid = child.id(), "set_nonblock failed"); + } + res + }}; + } + set_nonblock_or_log_err!(stdin)?; + set_nonblock_or_log_err!(stdout)?; + + // all fallible operations post-spawn are complete, so get rid of the guard + let child = scopeguard::ScopeGuard::into_inner(child); + + tokio::spawn( + async move { + scopeguard::defer! { + debug!("wal-redo-postgres stderr_logger_task finished"); + crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_finished.inc(); + } + debug!("wal-redo-postgres stderr_logger_task started"); + crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_started.inc(); + + use tokio::io::AsyncBufReadExt; + let mut stderr_lines = tokio::io::BufReader::new(stderr); + let mut buf = Vec::new(); + let res = loop { + buf.clear(); + // TODO we don't trust the process to cap its stderr length. + // Currently it can do unbounded Vec allocation. + match stderr_lines.read_until(b'\n', &mut buf).await { + Ok(0) => break Ok(()), // eof + Ok(num_bytes) => { + let output = String::from_utf8_lossy(&buf[..num_bytes]); + error!(%output, "received output"); + } + Err(e) => { + break Err(e); + } + } + }; + match res { + Ok(()) => (), + Err(e) => { + error!(error=?e, "failed to read from walredo stderr"); + } + } + }.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %pg_version)) + ); + + Ok(Self { + conf, + tenant_shard_id, + child: Some(child), + stdin: Mutex::new(ProcessInput { + stdin, + n_requests: 0, + }), + stdout: Mutex::new(ProcessOutput { + stdout, + pending_responses: VecDeque::new(), + n_processed_responses: 0, + }), + #[cfg(feature = "testing")] + dump_sequence: AtomicUsize::default(), + }) + } + + pub(crate) fn id(&self) -> u32 { + self.child + .as_ref() + .expect("must not call this during Drop") + .id() + } + + // Apply given WAL records ('records') over an old page image. Returns + // new page image. + // + #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))] + pub(crate) fn apply_wal_records( + &self, + rel: RelTag, + blknum: u32, + base_img: &Option, + records: &[(Lsn, NeonWalRecord)], + wal_redo_timeout: Duration, + ) -> anyhow::Result { + let tag = protocol::BufferTag { rel, blknum }; + let input = self.stdin.lock().unwrap(); + + // Serialize all the messages to send the WAL redo process first. + // + // This could be problematic if there are millions of records to replay, + // but in practice the number of records is usually so small that it doesn't + // matter, and it's better to keep this code simple. + // + // Most requests start with a before-image with BLCKSZ bytes, followed by + // by some other WAL records. Start with a buffer that can hold that + // comfortably. + let mut writebuf: Vec = Vec::with_capacity((BLCKSZ as usize) * 3); + protocol::build_begin_redo_for_block_msg(tag, &mut writebuf); + if let Some(img) = base_img { + protocol::build_push_page_msg(tag, img, &mut writebuf); + } + for (lsn, rec) in records.iter() { + if let NeonWalRecord::Postgres { + will_init: _, + rec: postgres_rec, + } = rec + { + protocol::build_apply_record_msg(*lsn, postgres_rec, &mut writebuf); + } else { + anyhow::bail!("tried to pass neon wal record to postgres WAL redo"); + } + } + protocol::build_get_page_msg(tag, &mut writebuf); + WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64); + + let res = self.apply_wal_records0(&writebuf, input, wal_redo_timeout); + + if res.is_err() { + // not all of these can be caused by this particular input, however these are so rare + // in tests so capture all. + self.record_and_log(&writebuf); + } + + res + } + + fn apply_wal_records0( + &self, + writebuf: &[u8], + input: MutexGuard, + wal_redo_timeout: Duration, + ) -> anyhow::Result { + let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small. + let mut nwrite = 0usize; + + while nwrite < writebuf.len() { + let mut stdin_pollfds = [PollFd::new(&proc.stdin, PollFlags::POLLOUT)]; + let n = loop { + match nix::poll::poll(&mut stdin_pollfds[..], wal_redo_timeout.as_millis() as i32) { + Err(nix::errno::Errno::EINTR) => continue, + res => break res, + } + }?; + + if n == 0 { + anyhow::bail!("WAL redo timed out"); + } + + // If 'stdin' is writeable, do write. + let in_revents = stdin_pollfds[0].revents().unwrap(); + if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() { + nwrite += proc.stdin.write(&writebuf[nwrite..])?; + } + if in_revents.contains(PollFlags::POLLHUP) { + // We still have more data to write, but the process closed the pipe. + anyhow::bail!("WAL redo process closed its stdin unexpectedly"); + } + } + let request_no = proc.n_requests; + proc.n_requests += 1; + drop(proc); + + // To improve walredo performance we separate sending requests and receiving + // responses. Them are protected by different mutexes (output and input). + // If thread T1, T2, T3 send requests D1, D2, D3 to walredo process + // then there is not warranty that T1 will first granted output mutex lock. + // To address this issue we maintain number of sent requests, number of processed + // responses and ring buffer with pending responses. After sending response + // (under input mutex), threads remembers request number. Then it releases + // input mutex, locks output mutex and fetch in ring buffer all responses until + // its stored request number. The it takes correspondent element from + // pending responses ring buffer and truncate all empty elements from the front, + // advancing processed responses number. + + let mut output = self.stdout.lock().unwrap(); + let n_processed_responses = output.n_processed_responses; + while n_processed_responses + output.pending_responses.len() <= request_no { + // We expect the WAL redo process to respond with an 8k page image. We read it + // into this buffer. + let mut resultbuf = vec![0; BLCKSZ.into()]; + let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far + while nresult < BLCKSZ.into() { + let mut stdout_pollfds = [PollFd::new(&output.stdout, PollFlags::POLLIN)]; + // We do two things simultaneously: reading response from stdout + // and forward any logging information that the child writes to its stderr to the page server's log. + let n = loop { + match nix::poll::poll( + &mut stdout_pollfds[..], + wal_redo_timeout.as_millis() as i32, + ) { + Err(nix::errno::Errno::EINTR) => continue, + res => break res, + } + }?; + + if n == 0 { + anyhow::bail!("WAL redo timed out"); + } + + // If we have some data in stdout, read it to the result buffer. + let out_revents = stdout_pollfds[0].revents().unwrap(); + if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() { + nresult += output.stdout.read(&mut resultbuf[nresult..])?; + } + if out_revents.contains(PollFlags::POLLHUP) { + anyhow::bail!("WAL redo process closed its stdout unexpectedly"); + } + } + output + .pending_responses + .push_back(Some(Bytes::from(resultbuf))); + } + // Replace our request's response with None in `pending_responses`. + // Then make space in the ring buffer by clearing out any seqence of contiguous + // `None`'s from the front of `pending_responses`. + // NB: We can't pop_front() because other requests' responses because another + // requester might have grabbed the output mutex before us: + // T1: grab input mutex + // T1: send request_no 23 + // T1: release input mutex + // T2: grab input mutex + // T2: send request_no 24 + // T2: release input mutex + // T2: grab output mutex + // T2: n_processed_responses + output.pending_responses.len() <= request_no + // 23 0 24 + // T2: enters poll loop that reads stdout + // T2: put response for 23 into pending_responses + // T2: put response for 24 into pending_resposnes + // pending_responses now looks like this: Front Some(response_23) Some(response_24) Back + // T2: takes its response_24 + // pending_responses now looks like this: Front Some(response_23) None Back + // T2: does the while loop below + // pending_responses now looks like this: Front Some(response_23) None Back + // T2: releases output mutex + // T1: grabs output mutex + // T1: n_processed_responses + output.pending_responses.len() > request_no + // 23 2 23 + // T1: skips poll loop that reads stdout + // T1: takes its response_23 + // pending_responses now looks like this: Front None None Back + // T2: does the while loop below + // pending_responses now looks like this: Front Back + // n_processed_responses now has value 25 + let res = output.pending_responses[request_no - n_processed_responses] + .take() + .expect("we own this request_no, nobody else is supposed to take it"); + while let Some(front) = output.pending_responses.front() { + if front.is_none() { + output.pending_responses.pop_front(); + output.n_processed_responses += 1; + } else { + break; + } + } + Ok(res) + } + + #[cfg(feature = "testing")] + fn record_and_log(&self, writebuf: &[u8]) { + use std::sync::atomic::Ordering; + + let millis = std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis(); + + let seq = self.dump_sequence.fetch_add(1, Ordering::Relaxed); + + // these files will be collected to an allure report + let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len()); + + let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename); + + let res = std::fs::OpenOptions::new() + .write(true) + .create_new(true) + .read(true) + .open(path) + .and_then(|mut f| f.write_all(writebuf)); + + // trip up allowed_errors + if let Err(e) = res { + tracing::error!(target=%filename, length=writebuf.len(), "failed to write out the walredo errored input: {e}"); + } else { + tracing::error!(filename, "erroring walredo input saved"); + } + } + + #[cfg(not(feature = "testing"))] + fn record_and_log(&self, _: &[u8]) {} +} + +impl Drop for WalRedoProcess { + fn drop(&mut self) { + self.child + .take() + .expect("we only do this once") + .kill_and_wait(WalRedoKillCause::WalRedoProcessDrop); + // no way to wait for stderr_logger_task from Drop because that is async only + } +} diff --git a/pageserver/src/walredo/process/no_leak_child.rs b/pageserver/src/walredo/process/no_leak_child.rs new file mode 100644 index 0000000000..ca016408e6 --- /dev/null +++ b/pageserver/src/walredo/process/no_leak_child.rs @@ -0,0 +1,126 @@ +use tracing; +use tracing::error; +use tracing::info; +use tracing::instrument; + +use crate::metrics::WalRedoKillCause; +use crate::metrics::WAL_REDO_PROCESS_COUNTERS; + +use std::io; +use std::process::Command; + +use std::ops::DerefMut; + +use std::ops::Deref; + +use std::process::Child; + +use pageserver_api::shard::TenantShardId; + +/// Wrapper type around `std::process::Child` which guarantees that the child +/// will be killed and waited-for by this process before being dropped. +pub(crate) struct NoLeakChild { + pub(crate) tenant_id: TenantShardId, + pub(crate) child: Option, +} + +impl Deref for NoLeakChild { + type Target = Child; + + fn deref(&self) -> &Self::Target { + self.child.as_ref().expect("must not use from drop") + } +} + +impl DerefMut for NoLeakChild { + fn deref_mut(&mut self) -> &mut Self::Target { + self.child.as_mut().expect("must not use from drop") + } +} + +impl NoLeakChild { + pub(crate) fn spawn(tenant_id: TenantShardId, command: &mut Command) -> io::Result { + let child = command.spawn()?; + Ok(NoLeakChild { + tenant_id, + child: Some(child), + }) + } + + pub(crate) fn kill_and_wait(mut self, cause: WalRedoKillCause) { + let child = match self.child.take() { + Some(child) => child, + None => return, + }; + Self::kill_and_wait_impl(child, cause); + } + + #[instrument(skip_all, fields(pid=child.id(), ?cause))] + pub(crate) fn kill_and_wait_impl(mut child: Child, cause: WalRedoKillCause) { + scopeguard::defer! { + WAL_REDO_PROCESS_COUNTERS.killed_by_cause[cause].inc(); + } + let res = child.kill(); + if let Err(e) = res { + // This branch is very unlikely because: + // - We (= pageserver) spawned this process successfully, so, we're allowed to kill it. + // - This is the only place that calls .kill() + // - We consume `self`, so, .kill() can't be called twice. + // - If the process exited by itself or was killed by someone else, + // .kill() will still succeed because we haven't wait()'ed yet. + // + // So, if we arrive here, we have really no idea what happened, + // whether the PID stored in self.child is still valid, etc. + // If this function were fallible, we'd return an error, but + // since it isn't, all we can do is log an error and proceed + // with the wait(). + error!(error = %e, "failed to SIGKILL; subsequent wait() might fail or wait for wrong process"); + } + + match child.wait() { + Ok(exit_status) => { + info!(exit_status = %exit_status, "wait successful"); + } + Err(e) => { + error!(error = %e, "wait error; might leak the child process; it will show as zombie (defunct)"); + } + } + } +} + +impl Drop for NoLeakChild { + fn drop(&mut self) { + let child = match self.child.take() { + Some(child) => child, + None => return, + }; + let tenant_shard_id = self.tenant_id; + // Offload the kill+wait of the child process into the background. + // If someone stops the runtime, we'll leak the child process. + // We can ignore that case because we only stop the runtime on pageserver exit. + tokio::runtime::Handle::current().spawn(async move { + tokio::task::spawn_blocking(move || { + // Intentionally don't inherit the tracing context from whoever is dropping us. + // This thread here is going to outlive of our dropper. + let span = tracing::info_span!( + "walredo", + tenant_id = %tenant_shard_id.tenant_id, + shard_id = %tenant_shard_id.shard_slug() + ); + let _entered = span.enter(); + Self::kill_and_wait_impl(child, WalRedoKillCause::NoLeakChildDrop); + }) + .await + }); + } +} + +pub(crate) trait NoLeakChildCommandExt { + fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result; +} + +impl NoLeakChildCommandExt for Command { + fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result { + NoLeakChild::spawn(tenant_id, self) + } +} diff --git a/pageserver/src/walredo/process/protocol.rs b/pageserver/src/walredo/process/protocol.rs new file mode 100644 index 0000000000..b703344cc8 --- /dev/null +++ b/pageserver/src/walredo/process/protocol.rs @@ -0,0 +1,57 @@ +use bytes::BufMut; +use pageserver_api::reltag::RelTag; +use serde::Serialize; +use utils::bin_ser::BeSer; +use utils::lsn::Lsn; + +/// +/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster. +/// +/// In Postgres `BufferTag` structure is used for exactly the same purpose. +/// [See more related comments here](https://github.com/postgres/postgres/blob/99c5852e20a0987eca1c38ba0c09329d4076b6a0/src/include/storage/buf_internals.h#L91). +/// +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Serialize)] +pub(crate) struct BufferTag { + pub rel: RelTag, + pub blknum: u32, +} + +pub(crate) fn build_begin_redo_for_block_msg(tag: BufferTag, buf: &mut Vec) { + let len = 4 + 1 + 4 * 4; + + buf.put_u8(b'B'); + buf.put_u32(len as u32); + + tag.ser_into(buf) + .expect("serialize BufferTag should always succeed"); +} + +pub(crate) fn build_push_page_msg(tag: BufferTag, base_img: &[u8], buf: &mut Vec) { + assert!(base_img.len() == 8192); + + let len = 4 + 1 + 4 * 4 + base_img.len(); + + buf.put_u8(b'P'); + buf.put_u32(len as u32); + tag.ser_into(buf) + .expect("serialize BufferTag should always succeed"); + buf.put(base_img); +} + +pub(crate) fn build_apply_record_msg(endlsn: Lsn, rec: &[u8], buf: &mut Vec) { + let len = 4 + 8 + rec.len(); + + buf.put_u8(b'A'); + buf.put_u32(len as u32); + buf.put_u64(endlsn.0); + buf.put(rec); +} + +pub(crate) fn build_get_page_msg(tag: BufferTag, buf: &mut Vec) { + let len = 4 + 1 + 4 * 4; + + buf.put_u8(b'G'); + buf.put_u32(len as u32); + tag.ser_into(buf) + .expect("serialize BufferTag should always succeed"); +}