mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
pageserver - don't use tokio for walredo
This commit is contained in:
24
Cargo.lock
generated
24
Cargo.lock
generated
@@ -177,9 +177,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "bitflags"
|
||||
version = "1.2.1"
|
||||
version = "1.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
|
||||
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
|
||||
|
||||
[[package]]
|
||||
name = "bitvec"
|
||||
@@ -335,7 +335,7 @@ dependencies = [
|
||||
"bytes",
|
||||
"hex",
|
||||
"lazy_static",
|
||||
"nix",
|
||||
"nix 0.20.0",
|
||||
"pageserver",
|
||||
"postgres",
|
||||
"postgres_ffi",
|
||||
@@ -924,9 +924,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.101"
|
||||
version = "0.2.103"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3cb00336871be5ed2c8ed44b60ae9959dc5b9f08539422ed43f09e34ecaeba21"
|
||||
checksum = "dd8f7255a17a627354f321ef0055d63b898c6fb27eff628af4d1b66b7331edf6"
|
||||
|
||||
[[package]]
|
||||
name = "libloading"
|
||||
@@ -1072,6 +1072,19 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nix"
|
||||
version = "0.23.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f305c2c2e4c39a82f7bf0bf65fb557f9070ce06781d4f2454295cc34b1c43188"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"cc",
|
||||
"cfg-if 1.0.0",
|
||||
"libc",
|
||||
"memoffset",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nom"
|
||||
version = "6.1.2"
|
||||
@@ -1209,6 +1222,7 @@ dependencies = [
|
||||
"hyper",
|
||||
"lazy_static",
|
||||
"log",
|
||||
"nix 0.23.0",
|
||||
"postgres",
|
||||
"postgres-protocol",
|
||||
"postgres-types",
|
||||
|
||||
@@ -35,6 +35,7 @@ scopeguard = "1.1.0"
|
||||
rust-s3 = { version = "0.27.0-rc4", features = ["no-verify-ssl"] }
|
||||
async-trait = "0.1"
|
||||
const_format = "0.2.21"
|
||||
nix = "0.23.0"
|
||||
|
||||
postgres_ffi = { path = "../postgres_ffi" }
|
||||
zenith_metrics = { path = "../zenith_metrics" }
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
//! process, he cannot escape out of it.
|
||||
|
||||
mod nonrel;
|
||||
mod process_utils;
|
||||
mod request;
|
||||
|
||||
use bytes::Bytes;
|
||||
@@ -28,16 +29,16 @@ use serde::Serialize;
|
||||
use std::fs;
|
||||
use std::fs::OpenOptions;
|
||||
use std::io::prelude::*;
|
||||
use std::io::BufReader;
|
||||
use std::io::Error;
|
||||
use std::path::PathBuf;
|
||||
use std::process::ChildStdin;
|
||||
use std::process::ChildStdout;
|
||||
use std::process::Command;
|
||||
use std::process::Stdio;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::process::{ChildStdin, ChildStdout, Command};
|
||||
use tokio::time::timeout;
|
||||
use zenith_metrics::{register_histogram, register_int_counter, Histogram, IntCounter};
|
||||
use zenith_utils::lsn::Lsn;
|
||||
use zenith_utils::zid::ZTenantId;
|
||||
@@ -132,7 +133,6 @@ pub struct PostgresRedoManager {
|
||||
tenantid: ZTenantId,
|
||||
conf: &'static PageServerConf,
|
||||
|
||||
runtime: tokio::runtime::Runtime,
|
||||
process: Mutex<Option<PostgresRedoProcess>>,
|
||||
}
|
||||
|
||||
@@ -193,15 +193,12 @@ impl WalRedoManager for PostgresRedoManager {
|
||||
|
||||
// launch the WAL redo process on first use
|
||||
if process_guard.is_none() {
|
||||
let p = self
|
||||
.runtime
|
||||
.block_on(PostgresRedoProcess::launch(self.conf, &self.tenantid))?;
|
||||
*process_guard = Some(p);
|
||||
let redo_process = PostgresRedoProcess::launch(self.conf, &self.tenantid)?;
|
||||
*process_guard = Some(redo_process);
|
||||
}
|
||||
let process = process_guard.as_mut().unwrap();
|
||||
|
||||
self.runtime
|
||||
.block_on(self.handle_apply_request(process, &request))
|
||||
self.handle_apply_request(process, &request)
|
||||
};
|
||||
end_time = Instant::now();
|
||||
|
||||
@@ -216,18 +213,9 @@ impl PostgresRedoManager {
|
||||
///
|
||||
/// Create a new PostgresRedoManager.
|
||||
///
|
||||
pub fn new(conf: &'static PageServerConf, tenantid: ZTenantId) -> PostgresRedoManager {
|
||||
// We block on waiting for requests on the walredo request channel, but
|
||||
// use async I/O to communicate with the child process. Initialize the
|
||||
// runtime for the async part.
|
||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
pub fn new(conf: &'static PageServerConf, tenantid: ZTenantId) -> Self {
|
||||
// The actual process is launched lazily, on first request.
|
||||
PostgresRedoManager {
|
||||
runtime,
|
||||
Self {
|
||||
tenantid,
|
||||
conf,
|
||||
process: Mutex::new(None),
|
||||
@@ -237,7 +225,7 @@ impl PostgresRedoManager {
|
||||
///
|
||||
/// Process one request for WAL redo.
|
||||
///
|
||||
async fn handle_apply_request(
|
||||
fn handle_apply_request(
|
||||
&self,
|
||||
process: &mut PostgresRedoProcess,
|
||||
request: &WalRedoRequest,
|
||||
@@ -250,9 +238,7 @@ impl PostgresRedoManager {
|
||||
rel,
|
||||
blknum: request.blknum,
|
||||
};
|
||||
process
|
||||
.apply_wal_records(buf_tag, &request.base_img, &request.records)
|
||||
.await
|
||||
process.apply_wal_records(buf_tag, &request.base_img, &request.records)
|
||||
} else {
|
||||
Ok(nonrel::apply_nonrel(request))
|
||||
};
|
||||
@@ -294,10 +280,7 @@ impl PostgresRedoProcess {
|
||||
//
|
||||
// Start postgres binary in special WAL redo mode.
|
||||
//
|
||||
async fn launch(
|
||||
conf: &PageServerConf,
|
||||
tenantid: &ZTenantId,
|
||||
) -> Result<PostgresRedoProcess, Error> {
|
||||
fn launch(conf: &PageServerConf, tenantid: &ZTenantId) -> Result<PostgresRedoProcess, Error> {
|
||||
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
|
||||
// just create one with constant name. That fails if you try to launch more than
|
||||
// one WAL redo manager concurrently.
|
||||
@@ -318,7 +301,6 @@ impl PostgresRedoProcess {
|
||||
.env("LD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap())
|
||||
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap())
|
||||
.output()
|
||||
.await
|
||||
.expect("failed to execute initdb");
|
||||
|
||||
if !initdb.status.success() {
|
||||
@@ -359,26 +341,26 @@ impl PostgresRedoProcess {
|
||||
let stderr = child.stderr.take().expect("failed to open child's stderr");
|
||||
let stdout = child.stdout.take().expect("failed to open child's stdout");
|
||||
|
||||
// This async block reads the child's stderr, and forwards it to the logger
|
||||
let f_stderr = async {
|
||||
let mut stderr_buffered = tokio::io::BufReader::new(stderr);
|
||||
process_utils::set_nonblocking(&stdin)?;
|
||||
process_utils::set_nonblocking(&stdout)?;
|
||||
|
||||
let mut line = String::new();
|
||||
loop {
|
||||
let res = stderr_buffered.read_line(&mut line).await;
|
||||
if res.is_err() {
|
||||
debug!("could not convert line to utf-8");
|
||||
continue;
|
||||
std::thread::Builder::new()
|
||||
.name("wal-redo-stderr-proxy".to_string())
|
||||
.spawn(|| {
|
||||
let mut stderr_buffered = BufReader::new(stderr);
|
||||
|
||||
let mut line = String::new();
|
||||
loop {
|
||||
line.clear();
|
||||
|
||||
match stderr_buffered.read_line(&mut line) {
|
||||
Ok(0) => break,
|
||||
Ok(_) => error!("wal-redo-postgres: {}", line.trim()),
|
||||
Err(e) => debug!("error reading wal-redo stderr: {:#?}", e),
|
||||
}
|
||||
}
|
||||
if res.unwrap() == 0 {
|
||||
break;
|
||||
}
|
||||
error!("wal-redo-postgres: {}", line.trim());
|
||||
line.clear();
|
||||
}
|
||||
Ok::<(), Error>(())
|
||||
};
|
||||
tokio::spawn(f_stderr);
|
||||
})
|
||||
.unwrap(); // TODO propogate error
|
||||
|
||||
Ok(PostgresRedoProcess { stdin, stdout })
|
||||
}
|
||||
@@ -387,41 +369,30 @@ impl PostgresRedoProcess {
|
||||
// Apply given WAL records ('records') over an old page image. Returns
|
||||
// new page image.
|
||||
//
|
||||
async fn apply_wal_records(
|
||||
fn apply_wal_records(
|
||||
&mut self,
|
||||
tag: BufferTag,
|
||||
base_img: &Option<Bytes>,
|
||||
base_img_opt: &Option<Bytes>,
|
||||
records: &[WALRecord],
|
||||
) -> Result<Bytes, std::io::Error> {
|
||||
let stdin = &mut self.stdin;
|
||||
let stdout = &mut self.stdout;
|
||||
|
||||
// We do three things simultaneously: send the old base image and WAL records to
|
||||
// the child process's stdin, read the result from child's stdout, and forward any logging
|
||||
// information that the child writes to its stderr to the page server's log.
|
||||
//
|
||||
// 'f_stdin' handles writing the base image and WAL records to the child process.
|
||||
// 'f_stdout' below reads the result back. And 'f_stderr', which was spawned into the
|
||||
// tokio runtime in the 'launch' function already, forwards the logging.
|
||||
let f_stdin = async {
|
||||
let buf = request::serialize_request(tag, base_img, records);
|
||||
|
||||
timeout(TIMEOUT, stdin.write_all(&buf)).await??;
|
||||
timeout(TIMEOUT, stdin.flush()).await??;
|
||||
|
||||
Ok::<(), Error>(())
|
||||
) -> std::io::Result<Bytes> {
|
||||
let mut timeout_writer = process_utils::TimeoutWriter {
|
||||
timeout: TIMEOUT,
|
||||
writer: &mut self.stdin,
|
||||
};
|
||||
|
||||
// Read back new page image
|
||||
let f_stdout = async {
|
||||
let mut buf = vec![0u8; 8192];
|
||||
let buf = request::serialize_request(tag, base_img_opt, records);
|
||||
timeout_writer.write_all(&buf)?;
|
||||
drop(timeout_writer);
|
||||
|
||||
timeout(TIMEOUT, stdout.read_exact(&mut buf)).await??;
|
||||
//debug!("got response for {}", tag.blknum);
|
||||
Ok::<Vec<u8>, Error>(buf)
|
||||
let mut buf = vec![0u8; 8192];
|
||||
|
||||
let mut timeout_reader = process_utils::TimeoutReader {
|
||||
timeout: TIMEOUT,
|
||||
reader: &mut self.stdout,
|
||||
};
|
||||
|
||||
let (buf, _) = tokio::try_join!(f_stdout, f_stdin)?;
|
||||
Ok::<Bytes, Error>(Bytes::from(buf))
|
||||
timeout_reader.read_exact(&mut buf)?;
|
||||
|
||||
Ok(Bytes::from(buf))
|
||||
}
|
||||
}
|
||||
|
||||
123
pageserver/src/walredo/process_utils.rs
Normal file
123
pageserver/src/walredo/process_utils.rs
Normal file
@@ -0,0 +1,123 @@
|
||||
use std::{
|
||||
convert::TryInto,
|
||||
io::{ErrorKind, Read},
|
||||
os::unix::prelude::AsRawFd,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
pub fn set_nonblocking(fd: &impl AsRawFd) -> std::io::Result<()> {
|
||||
use nix::fcntl::{fcntl, FcntlArg, OFlag};
|
||||
|
||||
let fd = fd.as_raw_fd();
|
||||
let flags_bits = fcntl(fd, FcntlArg::F_GETFL).unwrap();
|
||||
let mut flags = OFlag::from_bits(flags_bits).unwrap();
|
||||
flags.insert(OFlag::O_NONBLOCK);
|
||||
fcntl(fd, FcntlArg::F_SETFL(flags)).unwrap();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub struct TimeoutReader<'a, R> {
|
||||
pub timeout: Duration,
|
||||
pub reader: &'a mut R,
|
||||
}
|
||||
|
||||
impl<R: Read + AsRawFd> std::io::Read for TimeoutReader<'_, R> {
|
||||
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||
let mut start_time_opt: Option<Instant> = None;
|
||||
loop {
|
||||
match self.reader.read(buf) {
|
||||
ok @ Ok(_) => return ok,
|
||||
Err(ref e) if e.kind() == ErrorKind::WouldBlock => {}
|
||||
err @ Err(_) => return err,
|
||||
}
|
||||
|
||||
let timeout = if let Some(start_time) = start_time_opt {
|
||||
let elapsed = start_time.elapsed();
|
||||
match self.timeout.checked_sub(elapsed) {
|
||||
Some(timeout) => timeout,
|
||||
None => {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::TimedOut,
|
||||
"read timed out",
|
||||
));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
start_time_opt = Some(Instant::now());
|
||||
self.timeout
|
||||
};
|
||||
|
||||
use nix::{
|
||||
errno::Errno,
|
||||
poll::{poll, PollFd, PollFlags},
|
||||
};
|
||||
let mut poll_fd = PollFd::new(self.reader.as_raw_fd(), PollFlags::POLLIN);
|
||||
|
||||
let millis: i32 = timeout.as_millis().try_into().unwrap_or(i32::MAX);
|
||||
|
||||
match poll(std::slice::from_mut(&mut poll_fd), millis) {
|
||||
Ok(0) => {}
|
||||
Ok(n) => {
|
||||
debug_assert!(n == 1);
|
||||
}
|
||||
Err(Errno::EINTR) => {}
|
||||
Err(e) => return Err(std::io::Error::from_raw_os_error(e as i32)),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TimeoutWriter<'a, W: std::io::Write + AsRawFd> {
|
||||
pub timeout: Duration,
|
||||
pub writer: &'a mut W,
|
||||
}
|
||||
|
||||
impl<W: std::io::Write + AsRawFd> std::io::Write for TimeoutWriter<'_, W> {
|
||||
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||
let mut start_time_opt: Option<Instant> = None;
|
||||
loop {
|
||||
match self.writer.write(buf) {
|
||||
ok @ Ok(_) => return ok,
|
||||
Err(ref e) if e.kind() == ErrorKind::WouldBlock => {}
|
||||
err @ Err(_) => return err,
|
||||
}
|
||||
|
||||
let timeout = if let Some(start_time) = start_time_opt {
|
||||
let elapsed = start_time.elapsed();
|
||||
match self.timeout.checked_sub(elapsed) {
|
||||
Some(timeout) => timeout,
|
||||
None => {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::TimedOut,
|
||||
"write timed out",
|
||||
));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
start_time_opt = Some(Instant::now());
|
||||
self.timeout
|
||||
};
|
||||
|
||||
use nix::{
|
||||
errno::Errno,
|
||||
poll::{poll, PollFd, PollFlags},
|
||||
};
|
||||
let mut poll_fd = PollFd::new(self.writer.as_raw_fd(), PollFlags::POLLOUT);
|
||||
|
||||
let millis: i32 = timeout.as_millis().try_into().unwrap_or(i32::MAX);
|
||||
|
||||
match poll(std::slice::from_mut(&mut poll_fd), millis) {
|
||||
Ok(0) => {} // TODO want to check timeout before calling read again
|
||||
Ok(n) => {
|
||||
debug_assert!(n == 1);
|
||||
}
|
||||
Err(Errno::EINTR) => {}
|
||||
Err(e) => return Err(std::io::Error::from_raw_os_error(e as i32)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> std::io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user