diff --git a/src/walredo.rs b/src/walredo.rs index e37e2340dc..42a050bc72 100644 --- a/src/walredo.rs +++ b/src/walredo.rs @@ -15,14 +15,16 @@ // it's not a secure sandbox. // use tokio::runtime::Runtime; -use tokio::process::{Command}; +use tokio::process::{Command, Child, ChildStdin, ChildStdout}; use std::process::Stdio; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::AsyncBufReadExt; use std::io::Error; +use std::cell::RefCell; use std::assert; use std::sync::{Arc}; use log::*; +use std::time::Instant; use bytes::{Bytes, BytesMut, BufMut}; @@ -46,21 +48,52 @@ pub fn wal_applicator_main() // Loop forever, handling requests as they come. let walredo_channel_receiver = &page_cache::PAGECACHE.walredo_receiver; loop { - let request = walredo_channel_receiver.recv().unwrap(); - handle_apply_request(&runtime, request); + let mut process: WalRedoProcess; + + info!("launching WAL redo postgres process"); + { + let _guard = runtime.enter(); + process = WalRedoProcess::launch().unwrap(); + } + + // Pretty arbitrarily, reuse the same Postgres process for 100 requests. + // After that, kill it and start a new one. This is mostly to avoid + // using up all shared buffers in Postgres's shared buffer cache; we don't + // want to write any pages to disk in the WAL redo process. + for _i in 1..100 { + + let request = walredo_channel_receiver.recv().unwrap(); + + handle_apply_request(&process, &runtime, request); + } + + info!("killing WAL redo postgres process"); + let _ = runtime.block_on(process.stdin.get_mut().shutdown()); + let mut child = process.child; + drop(process.stdin); + let _ = runtime.block_on(child.wait()); } } -fn handle_apply_request(runtime: &Runtime, entry_rc: Arc) +fn handle_apply_request(process: &WalRedoProcess, runtime: &Runtime, entry_rc: Arc) { let tag = entry_rc.key.tag; + let lsn = entry_rc.key.lsn; let (base_img, records) = page_cache::collect_records_for_apply(entry_rc.as_ref()); let mut entry = entry_rc.content.lock().unwrap(); entry.apply_pending = false; - let apply_result = runtime.block_on(apply_wal_records(tag, base_img, records)); + let nrecords = records.len(); + + let start = Instant::now(); + let apply_result = process.apply_wal_records(runtime, tag, base_img, records); + let duration = start.elapsed(); + + debug!("applied {} WAL records in {} ms to reconstruct page image at LSN {:X}/{:X}", + nrecords, duration.as_millis(), + lsn >> 32, lsn & 0xffff_ffff); if let Err(e) = apply_result { error!("could not apply WAL records: {}", e); @@ -72,98 +105,120 @@ fn handle_apply_request(runtime: &Runtime, entry_rc: Arc) entry_rc.walredo_condvar.notify_all(); } -// -// Apply given WAL records ('records') over an old page image. Returns -// new page image. -// -async fn apply_wal_records(tag: BufferTag, base_img: Option, records: Vec) -> Result -{ - // - // Start postgres binary in special WAL redo mode. - // - let child = - Command::new("postgres") - .arg("--wal-redo") - .stdin(Stdio::piped()) - .stderr(Stdio::piped()) - .stdout(Stdio::piped()) - .spawn() - .expect("postgres --wal-redo command failed to start"); +struct WalRedoProcess { + child: Child, + stdin: RefCell, + stdout: RefCell, +} - let mut stdin = child.stdin.expect("failed to open child's stdin"); - let stderr = child.stderr.expect("failed to open childs' stderr"); - let mut stdout = child.stdout.expect("failed to open childs' stdout"); +impl WalRedoProcess { - // - // This async block sends all the commands to the process. - // - // For reasons I don't understand, this needs to be a "move" block; - // otherwise the stdin pipe doesn't get closed, despite the shutdown() - // call. - // - let f_stdin = async move { - // Send base image, if any. (If the record initializes the page, previous page - // version is not needed.) - stdin.write(&build_begin_redo_for_block_msg(tag)).await?; - if base_img.is_some() { - stdin.write(&build_push_page_msg(tag, base_img.unwrap())).await?; - } + fn launch() -> Result { + // + // Start postgres binary in special WAL redo mode. + // + let mut child = + Command::new("postgres") + .arg("--wal-redo") + .stdin(Stdio::piped()) + .stderr(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn() + .expect("postgres --wal-redo command failed to start"); - // Send WAL records. - for rec in records.iter() { - let r = rec.clone(); + let stdin = child.stdin.take().expect("failed to open child's stdin"); + let stderr = child.stderr.take().expect("failed to open child's stderr"); + let stdout = child.stdout.take().expect("failed to open child's stdout"); - stdin.write(&build_apply_record_msg(r.lsn, r.rec)).await?; - } - debug!("sent {} WAL records to wal redo postgres process", records.len()); + // This async block reads the child's stderr, and forwards it to the logger + let f_stderr = async { + let mut stderr_buffered = tokio::io::BufReader::new(stderr); - // Send GetPage command to get the result back - stdin.write(&build_get_page_msg(tag)).await?; - debug!("sent GetPage"); - stdin.flush().await?; - stdin.shutdown().await?; - debug!("stdin finished"); - Ok::<(), Error>(()) - }; - - // This async block reads the child's stderr, and forwards it to the logger - let f_stderr = async move { - let mut stderr_buffered = tokio::io::BufReader::new(stderr); - - let mut line = String::new(); - loop { - let res = stderr_buffered.read_line(&mut line).await; - if res.is_err() { - debug!("could not convert line to utf-8"); - continue; + let mut line = String::new(); + loop { + let res = stderr_buffered.read_line(&mut line).await; + if res.is_err() { + debug!("could not convert line to utf-8"); + continue; + } + if res.unwrap() == 0 { + break; + } + debug!("{}", line.trim()); + line.clear(); } - if res.unwrap() == 0 { - break; - } - debug!("{}", line.trim()); - line.clear(); - } - Ok::<(), Error>(()) - }; + Ok::<(), Error>(()) + }; + tokio::spawn(f_stderr); - // Read back new page image - let f_stdout = async move { - let mut buf = [0u8; 8192]; + Ok(WalRedoProcess { + child: child, + stdin: RefCell::new(stdin), + stdout: RefCell::new(stdout), + }) + } - stdout.read_exact(&mut buf).await?; - debug!("got response"); - Ok::<[u8;8192], Error>(buf) - }; + // + // Apply given WAL records ('records') over an old page image. Returns + // new page image. + // + fn apply_wal_records(&self, runtime: &Runtime, tag: BufferTag, base_img: Option, records: Vec) -> Result + { + let mut stdin = self.stdin.borrow_mut(); + let mut stdout = self.stdout.borrow_mut(); + return runtime.block_on(async { - // Kill the process. This closes its stdin, which should signal the process - // to terminate. TODO: SIGKILL if needed - //child.wait(); + // + // This async block sends all the commands to the process. + // + // For reasons I don't understand, this needs to be a "move" block; + // otherwise the stdin pipe doesn't get closed, despite the shutdown() + // call. + // + let f_stdin = async { + // Send base image, if any. (If the record initializes the page, previous page + // version is not needed.) + stdin.write(&build_begin_redo_for_block_msg(tag)).await?; + if base_img.is_some() { + stdin.write(&build_push_page_msg(tag, base_img.unwrap())).await?; + } - let res = futures::try_join!(f_stdout, f_stdin, f_stderr)?; + // Send WAL records. + for rec in records.iter() { + let r = rec.clone(); - let buf = res.0; - - Ok::(Bytes::from(std::vec::Vec::from(buf))) + stdin.write(&build_apply_record_msg(r.lsn, r.rec)).await?; + } + //debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}", + // records.len(), lsn >> 32, lsn & 0xffff_ffff); + + // Send GetPage command to get the result back + stdin.write(&build_get_page_msg(tag)).await?; + stdin.flush().await?; + //debug!("sent GetPage for {}", tag.blknum); + Ok::<(), Error>(()) + }; + + // Read back new page image + let f_stdout = async { + let mut buf = [0u8; 8192]; + + stdout.read_exact(&mut buf).await?; + //debug!("got response for {}", tag.blknum); + Ok::<[u8;8192], Error>(buf) + }; + + // Kill the process. This closes its stdin, which should signal the process + // to terminate. TODO: SIGKILL if needed + //child.wait(); + + let res = futures::try_join!(f_stdout, f_stdin)?; + + let buf = res.0; + + Ok::(Bytes::from(std::vec::Vec::from(buf))) + }); + } } fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes