Reuse postgres --wal-redo process.

Use it for 100 requests before restarting the process.
This commit is contained in:
Heikki Linnakangas
2021-03-24 01:09:27 +02:00
committed by Stas Kelvich
parent 667ec0db60
commit bfc2ed26cf

View File

@@ -15,14 +15,16 @@
// it's not a secure sandbox.
//
use tokio::runtime::Runtime;
use tokio::process::{Command};
use tokio::process::{Command, Child, ChildStdin, ChildStdout};
use std::process::Stdio;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::io::AsyncBufReadExt;
use std::io::Error;
use std::cell::RefCell;
use std::assert;
use std::sync::{Arc};
use log::*;
use std::time::Instant;
use bytes::{Bytes, BytesMut, BufMut};
@@ -46,21 +48,52 @@ pub fn wal_applicator_main()
// Loop forever, handling requests as they come.
let walredo_channel_receiver = &page_cache::PAGECACHE.walredo_receiver;
loop {
let request = walredo_channel_receiver.recv().unwrap();
handle_apply_request(&runtime, request);
let mut process: WalRedoProcess;
info!("launching WAL redo postgres process");
{
let _guard = runtime.enter();
process = WalRedoProcess::launch().unwrap();
}
// Pretty arbitrarily, reuse the same Postgres process for 100 requests.
// After that, kill it and start a new one. This is mostly to avoid
// using up all shared buffers in Postgres's shared buffer cache; we don't
// want to write any pages to disk in the WAL redo process.
for _i in 1..100 {
let request = walredo_channel_receiver.recv().unwrap();
handle_apply_request(&process, &runtime, request);
}
info!("killing WAL redo postgres process");
let _ = runtime.block_on(process.stdin.get_mut().shutdown());
let mut child = process.child;
drop(process.stdin);
let _ = runtime.block_on(child.wait());
}
}
fn handle_apply_request(runtime: &Runtime, entry_rc: Arc<CacheEntry>)
fn handle_apply_request(process: &WalRedoProcess, runtime: &Runtime, entry_rc: Arc<CacheEntry>)
{
let tag = entry_rc.key.tag;
let lsn = entry_rc.key.lsn;
let (base_img, records) = page_cache::collect_records_for_apply(entry_rc.as_ref());
let mut entry = entry_rc.content.lock().unwrap();
entry.apply_pending = false;
let apply_result = runtime.block_on(apply_wal_records(tag, base_img, records));
let nrecords = records.len();
let start = Instant::now();
let apply_result = process.apply_wal_records(runtime, tag, base_img, records);
let duration = start.elapsed();
debug!("applied {} WAL records in {} ms to reconstruct page image at LSN {:X}/{:X}",
nrecords, duration.as_millis(),
lsn >> 32, lsn & 0xffff_ffff);
if let Err(e) = apply_result {
error!("could not apply WAL records: {}", e);
@@ -72,98 +105,120 @@ fn handle_apply_request(runtime: &Runtime, entry_rc: Arc<CacheEntry>)
entry_rc.walredo_condvar.notify_all();
}
//
// Apply given WAL records ('records') over an old page image. Returns
// new page image.
//
async fn apply_wal_records(tag: BufferTag, base_img: Option<Bytes>, records: Vec<WALRecord>) -> Result<Bytes, Error>
{
//
// Start postgres binary in special WAL redo mode.
//
let child =
Command::new("postgres")
.arg("--wal-redo")
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.expect("postgres --wal-redo command failed to start");
struct WalRedoProcess {
child: Child,
stdin: RefCell<ChildStdin>,
stdout: RefCell<ChildStdout>,
}
let mut stdin = child.stdin.expect("failed to open child's stdin");
let stderr = child.stderr.expect("failed to open childs' stderr");
let mut stdout = child.stdout.expect("failed to open childs' stdout");
impl WalRedoProcess {
//
// This async block sends all the commands to the process.
//
// For reasons I don't understand, this needs to be a "move" block;
// otherwise the stdin pipe doesn't get closed, despite the shutdown()
// call.
//
let f_stdin = async move {
// Send base image, if any. (If the record initializes the page, previous page
// version is not needed.)
stdin.write(&build_begin_redo_for_block_msg(tag)).await?;
if base_img.is_some() {
stdin.write(&build_push_page_msg(tag, base_img.unwrap())).await?;
}
fn launch() -> Result<WalRedoProcess, Error> {
//
// Start postgres binary in special WAL redo mode.
//
let mut child =
Command::new("postgres")
.arg("--wal-redo")
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.expect("postgres --wal-redo command failed to start");
// Send WAL records.
for rec in records.iter() {
let r = rec.clone();
let stdin = child.stdin.take().expect("failed to open child's stdin");
let stderr = child.stderr.take().expect("failed to open child's stderr");
let stdout = child.stdout.take().expect("failed to open child's stdout");
stdin.write(&build_apply_record_msg(r.lsn, r.rec)).await?;
}
debug!("sent {} WAL records to wal redo postgres process", records.len());
// This async block reads the child's stderr, and forwards it to the logger
let f_stderr = async {
let mut stderr_buffered = tokio::io::BufReader::new(stderr);
// Send GetPage command to get the result back
stdin.write(&build_get_page_msg(tag)).await?;
debug!("sent GetPage");
stdin.flush().await?;
stdin.shutdown().await?;
debug!("stdin finished");
Ok::<(), Error>(())
};
// This async block reads the child's stderr, and forwards it to the logger
let f_stderr = async move {
let mut stderr_buffered = tokio::io::BufReader::new(stderr);
let mut line = String::new();
loop {
let res = stderr_buffered.read_line(&mut line).await;
if res.is_err() {
debug!("could not convert line to utf-8");
continue;
let mut line = String::new();
loop {
let res = stderr_buffered.read_line(&mut line).await;
if res.is_err() {
debug!("could not convert line to utf-8");
continue;
}
if res.unwrap() == 0 {
break;
}
debug!("{}", line.trim());
line.clear();
}
if res.unwrap() == 0 {
break;
}
debug!("{}", line.trim());
line.clear();
}
Ok::<(), Error>(())
};
Ok::<(), Error>(())
};
tokio::spawn(f_stderr);
// Read back new page image
let f_stdout = async move {
let mut buf = [0u8; 8192];
Ok(WalRedoProcess {
child: child,
stdin: RefCell::new(stdin),
stdout: RefCell::new(stdout),
})
}
stdout.read_exact(&mut buf).await?;
debug!("got response");
Ok::<[u8;8192], Error>(buf)
};
//
// Apply given WAL records ('records') over an old page image. Returns
// new page image.
//
fn apply_wal_records(&self, runtime: &Runtime, tag: BufferTag, base_img: Option<Bytes>, records: Vec<WALRecord>) -> Result<Bytes, Error>
{
let mut stdin = self.stdin.borrow_mut();
let mut stdout = self.stdout.borrow_mut();
return runtime.block_on(async {
// Kill the process. This closes its stdin, which should signal the process
// to terminate. TODO: SIGKILL if needed
//child.wait();
//
// This async block sends all the commands to the process.
//
// For reasons I don't understand, this needs to be a "move" block;
// otherwise the stdin pipe doesn't get closed, despite the shutdown()
// call.
//
let f_stdin = async {
// Send base image, if any. (If the record initializes the page, previous page
// version is not needed.)
stdin.write(&build_begin_redo_for_block_msg(tag)).await?;
if base_img.is_some() {
stdin.write(&build_push_page_msg(tag, base_img.unwrap())).await?;
}
let res = futures::try_join!(f_stdout, f_stdin, f_stderr)?;
// Send WAL records.
for rec in records.iter() {
let r = rec.clone();
let buf = res.0;
Ok::<Bytes, Error>(Bytes::from(std::vec::Vec::from(buf)))
stdin.write(&build_apply_record_msg(r.lsn, r.rec)).await?;
}
//debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}",
// records.len(), lsn >> 32, lsn & 0xffff_ffff);
// Send GetPage command to get the result back
stdin.write(&build_get_page_msg(tag)).await?;
stdin.flush().await?;
//debug!("sent GetPage for {}", tag.blknum);
Ok::<(), Error>(())
};
// Read back new page image
let f_stdout = async {
let mut buf = [0u8; 8192];
stdout.read_exact(&mut buf).await?;
//debug!("got response for {}", tag.blknum);
Ok::<[u8;8192], Error>(buf)
};
// Kill the process. This closes its stdin, which should signal the process
// to terminate. TODO: SIGKILL if needed
//child.wait();
let res = futures::try_join!(f_stdout, f_stdin)?;
let buf = res.0;
Ok::<Bytes, Error>(Bytes::from(std::vec::Vec::from(buf)))
});
}
}
fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes