diff --git a/Cargo.lock b/Cargo.lock index 483c5d535b..503837bf09 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -180,9 +180,9 @@ dependencies = [ [[package]] name = "bitflags" -version = "1.2.1" +version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitvec" @@ -338,7 +338,7 @@ dependencies = [ "bytes", "hex", "lazy_static", - "nix", + "nix 0.20.0", "pageserver", "postgres", "postgres_ffi", @@ -886,9 +886,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.101" +version = "0.2.109" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cb00336871be5ed2c8ed44b60ae9959dc5b9f08539422ed43f09e34ecaeba21" +checksum = "f98a04dce437184842841303488f70d0188c5f51437d2a834dc097eafa909a01" [[package]] name = "libloading" @@ -1052,6 +1052,19 @@ dependencies = [ "libc", ] +[[package]] +name = "nix" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f305c2c2e4c39a82f7bf0bf65fb557f9070ce06781d4f2454295cc34b1c43188" +dependencies = [ + "bitflags", + "cc", + "cfg-if 1.0.0", + "libc", + "memoffset", +] + [[package]] name = "nom" version = "6.1.2" @@ -1190,6 +1203,7 @@ dependencies = [ "lazy_static", "log", "lz4_flex", + "nix 0.23.0", "parking_lot", "postgres", "postgres-protocol", @@ -2620,6 +2634,7 @@ dependencies = [ "jsonwebtoken", "lazy_static", "log", + "nix 0.23.0", "postgres", "rand", "routerify", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 9c243c6881..0f707e9d2d 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -37,6 +37,7 @@ async-trait = "0.1" const_format = "0.2.21" tracing = "0.1.27" signal-hook = {version = "0.3.10", features = ["extended-siginfo"] } +nix = "0.23" #yakv = { path = "../../yakv" } yakv = "0.2.7" lz4_flex = "0.9.0" diff --git a/pageserver/src/toast_store.rs b/pageserver/src/toast_store.rs index abade0e383..af2b6f9d0a 100644 --- a/pageserver/src/toast_store.rs +++ b/pageserver/src/toast_store.rs @@ -17,7 +17,7 @@ use yakv::storage::{ }; const TOAST_SEGMENT_SIZE: usize = 2 * 1024; -const CACHE_SIZE: usize = 32 * 1024; // 256Mb +const CACHE_SIZE: usize = 1024; // 8Mb /// /// Toast storage consistof two KV databases: one for storing main index diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 6d3b4233ef..2a46ff618b 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -22,24 +22,26 @@ use byteorder::{ByteOrder, LittleEndian}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use lazy_static::lazy_static; use log::*; -use rand::Rng; +use nix::poll::*; use serde::Serialize; use std::fs; use std::fs::OpenOptions; use std::io::prelude::*; -use std::io::Error; +use std::io::{Error, ErrorKind}; +use std::os::unix::io::AsRawFd; use std::path::PathBuf; use std::process::Stdio; +use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::mpsc; +use std::sync::mpsc::{Receiver, Sender, SyncSender}; use std::sync::Mutex; use std::time::Duration; use std::time::Instant; -use tokio::io::AsyncBufReadExt; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::process::{ChildStdin, ChildStdout, Command}; -use tokio::time::timeout; use zenith_metrics::{register_histogram, register_int_counter, Histogram, IntCounter}; use zenith_utils::bin_ser::BeSer; use zenith_utils::lsn::Lsn; +use zenith_utils::nonblock::set_nonblock; use zenith_utils::zid::ZTenantId; use crate::relish::*; @@ -54,7 +56,9 @@ use postgres_ffi::nonrelfile_utils::transaction_id_set_status; use postgres_ffi::pg_constants; use postgres_ffi::XLogRecord; -const WAL_REDO_WORKERS: usize = 1; +const N_CHANNELS: usize = 16; +const CHANNEL_SIZE: usize = 1024 * 1024; +type ChannelId = usize; /// /// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster. @@ -133,17 +137,22 @@ lazy_static! { /// /// This is the real implementation that uses a Postgres process to -/// perform WAL replay. Only one thread can use the processs at a time, -/// that is controlled by the Mutex. In the future, we might want to -/// launch a pool of processes to allow concurrent replay of multiple -/// records. +/// perform WAL replay. I multiplex requests from multiple threads +/// using `sender` channel and send them to the postgres wal-redo process +/// pipe by separate thread. Responses are returned through set of `receivers` +/// channels, used in round robin manner. Receiver thread is protected by mutex +/// to prevent it's usage by more than one thread +/// In the future, we might want to launch a pool of processes to allow concurrent +/// replay of multiple records. /// pub struct PostgresRedoManager { - tenantid: ZTenantId, - conf: &'static PageServerConf, - - runtime: tokio::runtime::Runtime, - workers: [Mutex>; WAL_REDO_WORKERS], + // mutiplexor pipe: use sync_channel to allow sharing sender by multiple threads + // and limit size of buffer + sender: SyncSender<(ChannelId, Vec)>, + // set of receiver channels + receivers: Vec>>, + // atomicly incremented counter for choosing receiver + round_robin: AtomicUsize, } #[derive(Debug)] @@ -156,6 +165,13 @@ struct WalRedoRequest { records: Vec<(Lsn, WALRecord)>, } +impl WalRedoRequest { + // Can this request be served by zenith redo funcitons + // or we need to pass it to wal-redo postgres process? + fn can_apply_in_zenith(&self) -> bool { + !matches!(self.rel, RelishTag::Relation(_)) + } +} /// An error happened in WAL redo #[derive(Debug, thiserror::Error)] pub enum WalRedoError { @@ -164,6 +180,8 @@ pub enum WalRedoError { #[error("cannot perform WAL redo now")] InvalidState, + #[error("cannot perform WAL redo for this request")] + InvalidRequest, } /// @@ -184,10 +202,6 @@ impl WalRedoManager for PostgresRedoManager { base_img: Option, records: Vec<(Lsn, WALRecord)>, ) -> Result { - let start_time; - let lock_time; - let end_time; - let request = WalRedoRequest { rel, blknum, @@ -195,30 +209,14 @@ impl WalRedoManager for PostgresRedoManager { base_img, records, }; - - start_time = Instant::now(); - let result = { - let mut process_guard = self.workers[rand::thread_rng().gen_range(0..WAL_REDO_WORKERS)] - .lock() - .unwrap(); - lock_time = Instant::now(); - - // launch the WAL redo process on first use - if process_guard.is_none() { - let p = self - .runtime - .block_on(PostgresRedoProcess::launch(self.conf, &self.tenantid))?; - *process_guard = Some(p); - } - let process = process_guard.as_mut().unwrap(); - - self.runtime - .block_on(self.handle_apply_request(process, &request)) + let start_time = Instant::now(); + let result = if request.can_apply_in_zenith() { + self.handle_apply_request_zenith(&request) + } else { + self.handle_apply_request_postgres(&request) }; - end_time = Instant::now(); - - WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64()); - WAL_REDO_TIME.observe(end_time.duration_since(lock_time).as_secs_f64()); + let end_time = Instant::now(); + WAL_REDO_TIME.observe(end_time.duration_since(start_time).as_secs_f64()); result } @@ -228,32 +226,109 @@ impl PostgresRedoManager { /// /// Create a new PostgresRedoManager. /// - pub fn new(conf: &'static PageServerConf, tenantid: ZTenantId) -> PostgresRedoManager { - // We block on waiting for requests on the walredo request channel, but - // use async I/O to communicate with the child process. Initialize the - // runtime for the async part. - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); + pub fn new(conf: &PageServerConf, tenantid: ZTenantId) -> PostgresRedoManager { + let (tx, rx): ( + SyncSender<(ChannelId, Vec)>, + Receiver<(ChannelId, Vec)>, + ) = mpsc::sync_channel(CHANNEL_SIZE); + let mut senders: Vec> = Vec::with_capacity(N_CHANNELS); + let mut receivers: Vec>> = Vec::with_capacity(N_CHANNELS); + for _ in 0..N_CHANNELS { + let (tx, rx) = mpsc::channel(); + senders.push(tx); + receivers.push(Mutex::new(rx)); + } + if let Ok(mut proc) = PostgresRedoProcess::launch(conf, &tenantid) { + let _proxy = std::thread::spawn(move || loop { + let (id, data) = rx.recv().unwrap(); + match proc.apply_wal_records(data) { + Ok(page) => senders[id as usize].send(page).unwrap(), + Err(err) => { + info!("wal-redo failed with error {:?}", err); + proc.kill(); + break; + } + } + }); + PostgresRedoManager { + sender: tx, + receivers, + round_robin: AtomicUsize::new(0), + } + } else { + panic!("Failed to launch wal-redo postgres"); + } + } - // The actual process is launched lazily, on first request. - PostgresRedoManager { - runtime, - tenantid, - conf, - workers: [(); WAL_REDO_WORKERS].map(|_| Mutex::new(None)), + fn apply_wal_records( + &self, + tag: BufferTag, + base_img: Option, + records: &[(Lsn, WALRecord)], + ) -> Result { + // Serialize all the messages to send the WAL redo process first. + // + // This could be problematic if there are millions of records to replay, + // but in practice the number of records is usually so small that it doesn't + // matter, and it's better to keep this code simple. + let mut writebuf: Vec = Vec::new(); + build_begin_redo_for_block_msg(tag, &mut writebuf); + if let Some(img) = base_img { + build_push_page_msg(tag, &img, &mut writebuf); + } + for (lsn, rec) in records.iter() { + build_apply_record_msg(*lsn, &rec.rec, &mut writebuf); + } + build_get_page_msg(tag, &mut writebuf); + WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64); + + let id = self.round_robin.fetch_add(1, Ordering::Relaxed) % N_CHANNELS; + let rx = self.receivers[id].lock().unwrap(); + self.sender.send((id, writebuf)).unwrap(); + Ok(rx.recv().unwrap()) + } + + /// + /// Process one request for WAL redo using wal-redo postgres + /// + fn handle_apply_request_postgres( + &self, + request: &WalRedoRequest, + ) -> Result { + let blknum = request.blknum; + let lsn = request.lsn; + let base_img = request.base_img.clone(); + let records = &request.records; + let nrecords = records.len(); + + let start = Instant::now(); + + let apply_result: Result; + + if let RelishTag::Relation(rel) = request.rel { + // Relational WAL records are applied using wal-redo-postgres + let buf_tag = BufferTag { rel, blknum }; + apply_result = self.apply_wal_records(buf_tag, base_img, records); + + let duration = start.elapsed(); + + debug!( + "postgres applied {} WAL records in {} us to reconstruct page image at LSN {}", + nrecords, + duration.as_micros(), + lsn + ); + + apply_result.map_err(WalRedoError::IoError) + } else { + Err(WalRedoError::InvalidRequest) } } /// - /// Process one request for WAL redo. + /// Process one request for WAL redo using custom zenith code /// - async fn handle_apply_request( - &self, - process: &mut PostgresRedoProcess, - request: &WalRedoRequest, - ) -> Result { + fn handle_apply_request_zenith(&self, request: &WalRedoRequest) -> Result { let rel = request.rel; let blknum = request.blknum; let lsn = request.lsn; @@ -265,178 +340,158 @@ impl PostgresRedoManager { let start = Instant::now(); let apply_result: Result; - if let RelishTag::Relation(rel) = rel { - // Relational WAL records are applied using wal-redo-postgres - let buf_tag = BufferTag { rel, blknum }; - apply_result = process.apply_wal_records(buf_tag, base_img, records).await; + + // Non-relational WAL records are handled here, with custom code that has the + // same effects as the corresponding Postgres WAL redo function. + const ZERO_PAGE: [u8; 8192] = [0u8; 8192]; + let mut page = BytesMut::new(); + if let Some(fpi) = base_img { + // If full-page image is provided, then use it... + page.extend_from_slice(&fpi[..]); } else { - // Non-relational WAL records are handled here, with custom code that has the - // same effects as the corresponding Postgres WAL redo function. - const ZERO_PAGE: [u8; 8192] = [0u8; 8192]; - let mut page = BytesMut::new(); - if let Some(fpi) = base_img { - // If full-page image is provided, then use it... - page.extend_from_slice(&fpi[..]); - } else { - // otherwise initialize page with zeros - page.extend_from_slice(&ZERO_PAGE); + // otherwise initialize page with zeros + page.extend_from_slice(&ZERO_PAGE); + } + // Apply all collected WAL records + for (_lsn, record) in records { + let mut buf = record.rec.clone(); + + WAL_REDO_RECORD_COUNTER.inc(); + + // 1. Parse XLogRecord struct + // FIXME: refactor to avoid code duplication. + let xlogrec = XLogRecord::from_bytes(&mut buf); + + //move to main data + // TODO probably, we should store some records in our special format + // to avoid this weird parsing on replay + let skip = (record.main_data_offset - pg_constants::SIZEOF_XLOGRECORD) as usize; + if buf.remaining() > skip { + buf.advance(skip); } - // Apply all collected WAL records - for (_lsn, record) in records { - let mut buf = record.rec.clone(); - WAL_REDO_RECORD_COUNTER.inc(); - - // 1. Parse XLogRecord struct - // FIXME: refactor to avoid code duplication. - let xlogrec = XLogRecord::from_bytes(&mut buf); - - //move to main data - // TODO probably, we should store some records in our special format - // to avoid this weird parsing on replay - let skip = (record.main_data_offset - pg_constants::SIZEOF_XLOGRECORD) as usize; - if buf.remaining() > skip { - buf.advance(skip); - } - - if xlogrec.xl_rmid == pg_constants::RM_XACT_ID { - // Transaction manager stuff - let rec_segno = match rel { - RelishTag::Slru { slru, segno } => { - assert!( - slru == SlruKind::Clog, - "Not valid XACT relish tag {:?}", - rel + if xlogrec.xl_rmid == pg_constants::RM_XACT_ID { + // Transaction manager stuff + let rec_segno = match rel { + RelishTag::Slru { slru, segno } => { + assert!( + slru == SlruKind::Clog, + "Not valid XACT relish tag {:?}", + rel + ); + segno + } + _ => panic!("Not valid XACT relish tag {:?}", rel), + }; + let parsed_xact = + XlXactParsedRecord::decode(&mut buf, xlogrec.xl_xid, xlogrec.xl_info); + if parsed_xact.info == pg_constants::XLOG_XACT_COMMIT + || parsed_xact.info == pg_constants::XLOG_XACT_COMMIT_PREPARED + { + transaction_id_set_status( + parsed_xact.xid, + pg_constants::TRANSACTION_STATUS_COMMITTED, + &mut page, + ); + for subxact in &parsed_xact.subxacts { + let pageno = *subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE; + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + // only update xids on the requested page + if rec_segno == segno && blknum == rpageno { + transaction_id_set_status( + *subxact, + pg_constants::TRANSACTION_STATUS_COMMITTED, + &mut page, ); - segno - } - _ => panic!("Not valid XACT relish tag {:?}", rel), - }; - let parsed_xact = - XlXactParsedRecord::decode(&mut buf, xlogrec.xl_xid, xlogrec.xl_info); - if parsed_xact.info == pg_constants::XLOG_XACT_COMMIT - || parsed_xact.info == pg_constants::XLOG_XACT_COMMIT_PREPARED - { - transaction_id_set_status( - parsed_xact.xid, - pg_constants::TRANSACTION_STATUS_COMMITTED, - &mut page, - ); - for subxact in &parsed_xact.subxacts { - let pageno = *subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE; - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - // only update xids on the requested page - if rec_segno == segno && blknum == rpageno { - transaction_id_set_status( - *subxact, - pg_constants::TRANSACTION_STATUS_COMMITTED, - &mut page, - ); - } - } - } else if parsed_xact.info == pg_constants::XLOG_XACT_ABORT - || parsed_xact.info == pg_constants::XLOG_XACT_ABORT_PREPARED - { - transaction_id_set_status( - parsed_xact.xid, - pg_constants::TRANSACTION_STATUS_ABORTED, - &mut page, - ); - for subxact in &parsed_xact.subxacts { - let pageno = *subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE; - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - // only update xids on the requested page - if rec_segno == segno && blknum == rpageno { - transaction_id_set_status( - *subxact, - pg_constants::TRANSACTION_STATUS_ABORTED, - &mut page, - ); - } } } - } else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID { - // Multixact operations - let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { - let xlrec = XlMultiXactCreate::decode(&mut buf); - if let RelishTag::Slru { - slru, - segno: rec_segno, - } = rel - { - if slru == SlruKind::MultiXactMembers { - for i in 0..xlrec.nmembers { - let pageno = - i / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - if segno == rec_segno && rpageno == blknum { - // update only target block - let offset = xlrec.moff + i; - let memberoff = mx_offset_to_member_offset(offset); - let flagsoff = mx_offset_to_flags_offset(offset); - let bshift = mx_offset_to_flags_bitshift(offset); - let mut flagsval = - LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]); - flagsval &= !(((1 - << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - - 1) + } else if parsed_xact.info == pg_constants::XLOG_XACT_ABORT + || parsed_xact.info == pg_constants::XLOG_XACT_ABORT_PREPARED + { + transaction_id_set_status( + parsed_xact.xid, + pg_constants::TRANSACTION_STATUS_ABORTED, + &mut page, + ); + for subxact in &parsed_xact.subxacts { + let pageno = *subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE; + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + // only update xids on the requested page + if rec_segno == segno && blknum == rpageno { + transaction_id_set_status( + *subxact, + pg_constants::TRANSACTION_STATUS_ABORTED, + &mut page, + ); + } + } + } + } else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID { + // Multixact operations + let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { + let xlrec = XlMultiXactCreate::decode(&mut buf); + if let RelishTag::Slru { + slru, + segno: rec_segno, + } = rel + { + if slru == SlruKind::MultiXactMembers { + for i in 0..xlrec.nmembers { + let pageno = i / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + if segno == rec_segno && rpageno == blknum { + // update only target block + let offset = xlrec.moff + i; + let memberoff = mx_offset_to_member_offset(offset); + let flagsoff = mx_offset_to_flags_offset(offset); + let bshift = mx_offset_to_flags_bitshift(offset); + let mut flagsval = + LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]); + flagsval &= + !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) << bshift); - flagsval |= xlrec.members[i as usize].status << bshift; - LittleEndian::write_u32( - &mut page[flagsoff..flagsoff + 4], - flagsval, - ); - LittleEndian::write_u32( - &mut page[memberoff..memberoff + 4], - xlrec.members[i as usize].xid, - ); - } + flagsval |= xlrec.members[i as usize].status << bshift; + LittleEndian::write_u32( + &mut page[flagsoff..flagsoff + 4], + flagsval, + ); + LittleEndian::write_u32( + &mut page[memberoff..memberoff + 4], + xlrec.members[i as usize].xid, + ); } - } else { - // Multixact offsets SLRU - let offs = (xlrec.mid - % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32 - * 4) as usize; - LittleEndian::write_u32(&mut page[offs..offs + 4], xlrec.moff); } } else { - panic!(); + // Multixact offsets SLRU + let offs = (xlrec.mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32 + * 4) as usize; + LittleEndian::write_u32(&mut page[offs..offs + 4], xlrec.moff); } } else { panic!(); } + } else { + panic!(); } } - - apply_result = Ok::(page.freeze()); } + apply_result = Ok::(page.freeze()); + let duration = start.elapsed(); - let result: Result; - debug!( - "applied {} WAL records in {} ms to reconstruct page image at LSN {}", + "zenith applied {} WAL records in {} ms to reconstruct page image at LSN {}", nrecords, duration.as_millis(), lsn ); - if let Err(e) = apply_result { - error!("could not apply WAL records: {:#}", e); - result = Err(WalRedoError::IoError(e)); - } else { - let img = apply_result.unwrap(); - - result = Ok(img); - } - - // The caller is responsible for sending the response - result + apply_result.map_err(WalRedoError::IoError) } } @@ -444,18 +499,17 @@ impl PostgresRedoManager { /// Handle to the Postgres WAL redo process /// struct PostgresRedoProcess { + child: Child, stdin: ChildStdin, stdout: ChildStdout, + stderr: ChildStderr, } impl PostgresRedoProcess { // // Start postgres binary in special WAL redo mode. // - async fn launch( - conf: &PageServerConf, - tenantid: &ZTenantId, - ) -> Result { + fn launch(conf: &PageServerConf, tenantid: &ZTenantId) -> Result { // FIXME: We need a dummy Postgres cluster to run the process in. Currently, we // just create one with constant name. That fails if you try to launch more than // one WAL redo manager concurrently. @@ -476,7 +530,6 @@ impl PostgresRedoProcess { .env("LD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap()) .env("DYLD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap()) .output() - .await .expect("failed to execute initdb"); if !initdb.status.success() { @@ -513,102 +566,114 @@ impl PostgresRedoProcess { datadir.display() ); - let stdin = child.stdin.take().expect("failed to open child's stdin"); - let stderr = child.stderr.take().expect("failed to open child's stderr"); - let stdout = child.stdout.take().expect("failed to open child's stdout"); + let stdin = child.stdin.take().unwrap(); + let stdout = child.stdout.take().unwrap(); + let stderr = child.stderr.take().unwrap(); - // This async block reads the child's stderr, and forwards it to the logger - let f_stderr = async { - let mut stderr_buffered = tokio::io::BufReader::new(stderr); + set_nonblock(stdin.as_raw_fd())?; + set_nonblock(stdout.as_raw_fd())?; + set_nonblock(stderr.as_raw_fd())?; - let mut line = String::new(); - loop { - let res = stderr_buffered.read_line(&mut line).await; - if res.is_err() { - debug!("could not convert line to utf-8"); - continue; - } - if res.unwrap() == 0 { - break; - } - error!("wal-redo-postgres: {}", line.trim()); - line.clear(); - } - Ok::<(), Error>(()) - }; - tokio::spawn(f_stderr); + Ok(PostgresRedoProcess { + child, + stdin, + stdout, + stderr, + }) + } - Ok(PostgresRedoProcess { stdin, stdout }) + fn kill(mut self) { + let _ = self.child.kill(); + if let Ok(exit_status) = self.child.wait() { + error!("wal-redo-postgres exited with code {}", exit_status); + } + drop(self); } // // Apply given WAL records ('records') over an old page image. Returns // new page image. // - async fn apply_wal_records( - &mut self, - tag: BufferTag, - base_img: Option, - records: &[(Lsn, WALRecord)], - ) -> Result { - let stdout = &mut self.stdout; - // Buffer the writes to avoid a lot of small syscalls. - let mut stdin = tokio::io::BufWriter::new(&mut self.stdin); + fn apply_wal_records(&mut self, writebuf: Vec) -> Result { + let mut nwrite = self.stdin.write(&writebuf)?; + + // We expect the WAL redo process to respond with an 8k page image. We read it + // into this buffer. + let mut resultbuf = vec![0; pg_constants::BLCKSZ.into()]; + let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far + + // Prepare for calling poll() + let mut pollfds = [ + PollFd::new(self.stdout.as_raw_fd(), PollFlags::POLLIN), + PollFd::new(self.stderr.as_raw_fd(), PollFlags::POLLIN), + PollFd::new(self.stdin.as_raw_fd(), PollFlags::POLLOUT), + ]; // We do three things simultaneously: send the old base image and WAL records to // the child process's stdin, read the result from child's stdout, and forward any logging // information that the child writes to its stderr to the page server's log. - // - // 'f_stdin' handles writing the base image and WAL records to the child process. - // 'f_stdout' below reads the result back. And 'f_stderr', which was spawned into the - // tokio runtime in the 'launch' function already, forwards the logging. - let f_stdin = async { - // Send base image, if any. (If the record initializes the page, previous page - // version is not needed.) - timeout( - TIMEOUT, - stdin.write_all(&build_begin_redo_for_block_msg(tag)), - ) - .await??; - if let Some(img) = base_img { - timeout(TIMEOUT, stdin.write_all(&build_push_page_msg(tag, &img))).await??; + while nresult < pg_constants::BLCKSZ.into() { + // If we have more data to write, wake up if 'stdin' becomes writeable or + // we have data to read. Otherwise only wake up if there's data to read. + let nfds = if nwrite < writebuf.len() { 3 } else { 2 }; + let n = nix::poll::poll(&mut pollfds[0..nfds], TIMEOUT.as_millis() as i32)?; + + if n == 0 { + return Err(Error::new(ErrorKind::Other, "WAL redo timed out")); } - // Send WAL records. - for (lsn, rec) in records.iter() { - WAL_REDO_RECORD_COUNTER.inc(); + // If we have some messages in stderr, forward them to the log. + let err_revents = pollfds[1].revents().unwrap(); + if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() { + let mut errbuf: [u8; 16384] = [0; 16384]; + let n = self.stderr.read(&mut errbuf)?; - stdin - .write_all(&build_apply_record_msg(*lsn, &rec.rec)) - .await?; + // The message might not be split correctly into lines here. But this is + // good enough, the important thing is to get the message to the log. + if n > 0 { + error!( + "wal-redo-postgres: {}", + String::from_utf8_lossy(&errbuf[0..n]) + ); - //debug!("sent WAL record to wal redo postgres process ({:X}/{:X}", - // r.lsn >> 32, r.lsn & 0xffff_ffff); + // To make sure we capture all log from the process if it fails, keep + // reading from the stderr, before checking the stdout. + continue; + } + } else if err_revents.contains(PollFlags::POLLHUP) { + return Err(Error::new( + ErrorKind::BrokenPipe, + "WAL redo process closed its stderr unexpectedly", + )); } - //debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}", - // records.len(), lsn >> 32, lsn & 0xffff_ffff); - // Send GetPage command to get the result back - timeout(TIMEOUT, stdin.write_all(&build_get_page_msg(tag))).await??; - timeout(TIMEOUT, stdin.flush()).await??; - //debug!("sent GetPage for {}", tag.blknum); - Ok::<(), Error>(()) - }; + // If we have more data to write and 'stdin' is writeable, do write. + if nwrite < writebuf.len() { + let in_revents = pollfds[2].revents().unwrap(); + if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() { + nwrite += self.stdin.write(&writebuf[nwrite..])?; + } else if in_revents.contains(PollFlags::POLLHUP) { + // We still have more data to write, but the process closed the pipe. + return Err(Error::new( + ErrorKind::BrokenPipe, + "WAL redo process closed its stdin unexpectedly", + )); + } + } - // Read back new page image - let f_stdout = async { - let mut buf = [0u8; 8192]; + // If we have some data in stdout, read it to the result buffer. + let out_revents = pollfds[0].revents().unwrap(); + if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() { + nresult += self.stdout.read(&mut resultbuf[nresult..])?; + } else if out_revents.contains(PollFlags::POLLHUP) { + return Err(Error::new( + ErrorKind::BrokenPipe, + "WAL redo process closed its stdout unexpectedly", + )); + } + } - timeout(TIMEOUT, stdout.read_exact(&mut buf)).await??; - //debug!("got response for {}", tag.blknum); - Ok::<[u8; 8192], Error>(buf) - }; - - let res = tokio::try_join!(f_stdout, f_stdin)?; - - let buf = res.0; - - Ok::(Bytes::from(std::vec::Vec::from(buf))) + Ok(Bytes::from(resultbuf)) } } @@ -616,62 +681,42 @@ impl PostgresRedoProcess { // process. See vendor/postgres/src/backend/tcop/zenith_wal_redo.c for // explanation of the protocol. -fn build_begin_redo_for_block_msg(tag: BufferTag) -> Vec { +fn build_begin_redo_for_block_msg(tag: BufferTag, buf: &mut Vec) { let len = 4 + 1 + 4 * 4; - let mut buf = Vec::with_capacity(1 + len); buf.put_u8(b'B'); buf.put_u32(len as u32); - tag.ser_into(&mut buf) + tag.ser_into(buf) .expect("serialize BufferTag should always succeed"); - - debug_assert!(buf.len() == 1 + len); - - buf } -fn build_push_page_msg(tag: BufferTag, base_img: &[u8]) -> Vec { +fn build_push_page_msg(tag: BufferTag, base_img: &[u8], buf: &mut Vec) { assert!(base_img.len() == 8192); let len = 4 + 1 + 4 * 4 + base_img.len(); - let mut buf = Vec::with_capacity(1 + len); buf.put_u8(b'P'); buf.put_u32(len as u32); - tag.ser_into(&mut buf) + tag.ser_into(buf) .expect("serialize BufferTag should always succeed"); buf.put(base_img); - - debug_assert!(buf.len() == 1 + len); - - buf } -fn build_apply_record_msg(endlsn: Lsn, rec: &[u8]) -> Vec { +fn build_apply_record_msg(endlsn: Lsn, rec: &[u8], buf: &mut Vec) { let len = 4 + 8 + rec.len(); - let mut buf: Vec = Vec::with_capacity(1 + len); buf.put_u8(b'A'); buf.put_u32(len as u32); buf.put_u64(endlsn.0); buf.put(rec); - - debug_assert!(buf.len() == 1 + len); - - buf } -fn build_get_page_msg(tag: BufferTag) -> Vec { +fn build_get_page_msg(tag: BufferTag, buf: &mut Vec) { let len = 4 + 1 + 4 * 4; - let mut buf = Vec::with_capacity(1 + len); buf.put_u8(b'G'); buf.put_u32(len as u32); - tag.ser_into(&mut buf) + tag.ser_into(buf) .expect("serialize BufferTag should always succeed"); - - debug_assert!(buf.len() == 1 + len); - - buf } diff --git a/zenith_utils/Cargo.toml b/zenith_utils/Cargo.toml index 6571fae042..af0577e4d6 100644 --- a/zenith_utils/Cargo.toml +++ b/zenith_utils/Cargo.toml @@ -27,6 +27,7 @@ workspace_hack = { path = "../workspace_hack" } rand = "0.8.3" jsonwebtoken = "7" hex = { version = "0.4.3", features = ["serde"] } +nix = "0.23.0" rustls = "0.19.1" rustls-split = "0.2.1" diff --git a/zenith_utils/src/lib.rs b/zenith_utils/src/lib.rs index 96b3cf5066..4ae1b7e491 100644 --- a/zenith_utils/src/lib.rs +++ b/zenith_utils/src/lib.rs @@ -40,3 +40,7 @@ pub mod logging; // Misc pub mod accum; + +// Utility for putting a raw file descriptor into non-blocking mode +pub mod nonblock; +