Compare commits

...

16 Commits

Author SHA1 Message Date
Joonas Koivunen
6a706af1e5 bench: remove prelaunch 2022-11-17 18:44:12 +02:00
Joonas Koivunen
2a646ed989 create less threads in bench 2022-11-17 18:33:27 +02:00
Joonas Koivunen
9a8dbd3fe6 perf: simple walredo bench (#2816)
adds a simple walredo bench to allow some comparison of the walredo
throughput.

Cc: #1339, #2778
2022-11-17 18:32:18 +02:00
Konstantin Knizhnik
fa73b8ebe5 Fix unit test build 2022-11-15 22:51:57 +02:00
Konstantin Knizhnik
ecb4539230 Support multiple walredo processes per tenant
refer #2770
2022-11-15 20:53:24 +02:00
Konstantin Knizhnik
74e77d8fe0 Fix merge conflicts 2022-11-15 12:57:09 +02:00
Konstantin Knizhnik
aa40544231 Replace panic! with error! in walredo thread 2022-11-15 12:31:23 +02:00
Konstantin Knizhnik
fa69ee3f90 Replace ring buffer for buffered walredp requests with VecDeqeu 2022-11-15 12:31:22 +02:00
Konstantin Knizhnik
e6b484dbea Include sender in sent message 2022-11-15 12:30:55 +02:00
Konstantin Knizhnik
32e0461863 Make clippy happy 2022-11-15 12:30:55 +02:00
Konstantin Knizhnik
a30cdea71f Make clippy happy 2022-11-15 12:30:55 +02:00
Konstantin Knizhnik
9ecc54e30b Fix merge conflicts 2022-11-15 12:30:55 +02:00
Konstantin Knizhnik
db2b14de75 Fix merge conflicts 2022-11-15 12:30:55 +02:00
Konstantin Knizhnik
3df753249a Use nonblocking IO for walredo pipe communication 2022-11-15 12:30:53 +02:00
Konstantin Knizhnik
915144ac96 Do buffering requests to walredo process 2022-11-15 12:29:44 +02:00
Konstantin Knizhnik
142e347c81 Send requests to walredo process through the channel 2022-11-15 12:29:08 +02:00
7 changed files with 763 additions and 248 deletions

View File

@@ -76,3 +76,7 @@ tempfile = "3.2"
[[bench]]
name = "bench_layer_map"
harness = false
[[bench]]
name = "bench_walredo"
harness = false

File diff suppressed because one or more lines are too long

View File

@@ -614,8 +614,9 @@ impl PageServerConf {
PathBuf::from(format!("../tmp_check/test_{test_name}"))
}
#[cfg(test)]
pub fn dummy_conf(repo_dir: PathBuf) -> Self {
let pg_distrib_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../pg_install");
PageServerConf {
id: NodeId(0),
wait_lsn_timeout: Duration::from_secs(60),
@@ -626,7 +627,7 @@ impl PageServerConf {
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
superuser: "cloud_admin".to_string(),
workdir: repo_dir,
pg_distrib_dir: PathBuf::new(),
pg_distrib_dir,
auth_type: AuthType::Trust,
auth_validation_public_key_path: None,
remote_storage_config: None,

View File

@@ -50,7 +50,7 @@ use crate::storage_sync::index::RemoteIndex;
use crate::task_mgr;
use crate::tenant_config::TenantConfOpt;
use crate::virtual_file::VirtualFile;
use crate::walredo::WalRedoManager;
use crate::walredo::{PostgresRedoManager, WalRedoManager};
use crate::{CheckpointConfig, TEMP_FILE_SUFFIX};
pub use pageserver_api::models::TenantState;
@@ -119,7 +119,7 @@ pub struct Tenant {
// with timelines, which in turn may cause dropping replication connection, expiration of wait_for_lsn
// timeout...
gc_cs: Mutex<()>,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
walredo_mgrs: Mutex<HashMap<u32, Arc<dyn WalRedoManager + Send + Sync>>>,
// provides access to timeline data sitting in the remote storage
// supposed to be used for retrieval of remote consistent lsn in walreceiver
@@ -831,6 +831,19 @@ impl Tenant {
}
let pg_version = new_metadata.pg_version();
let walredo_mgr = self
.walredo_mgrs
.lock()
.unwrap()
.entry(pg_version)
.or_insert_with(|| {
Arc::new(PostgresRedoManager::new(
self.conf,
self.tenant_id,
pg_version,
))
})
.clone();
Ok(Timeline::new(
self.conf,
Arc::clone(&self.tenant_conf),
@@ -838,7 +851,7 @@ impl Tenant {
ancestor,
new_timeline_id,
self.tenant_id,
Arc::clone(&self.walredo_mgr),
walredo_mgr,
self.upload_layers,
pg_version,
))
@@ -847,7 +860,6 @@ impl Tenant {
pub(super) fn new(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
tenant_id: TenantId,
remote_index: RemoteIndex,
upload_layers: bool,
@@ -857,9 +869,9 @@ impl Tenant {
tenant_id,
conf,
tenant_conf: Arc::new(RwLock::new(tenant_conf)),
walredo_mgrs: Mutex::new(HashMap::new()),
timelines: Mutex::new(HashMap::new()),
gc_cs: Mutex::new(()),
walredo_mgr,
remote_index,
upload_layers,
state,
@@ -1731,18 +1743,16 @@ pub mod harness {
}
pub fn try_load(&self) -> anyhow::Result<Tenant> {
let walredo_mgr = Arc::new(TestRedoManager);
let tenant = Tenant::new(
self.conf,
TenantConfOpt::from(self.tenant_conf),
walredo_mgr,
self.tenant_id,
RemoteIndex::default(),
false,
);
// populate tenant with locally available timelines
let mut timelines_to_load = HashMap::new();
let mut pg_version = 0u32;
for timeline_dir_entry in fs::read_dir(self.conf.timelines_path(&self.tenant_id))
.expect("should be able to read timelines dir")
{
@@ -1755,8 +1765,14 @@ pub mod harness {
.parse()?;
let timeline_metadata = load_metadata(self.conf, timeline_id, self.tenant_id)?;
pg_version = timeline_metadata.pg_version();
timelines_to_load.insert(timeline_id, timeline_metadata);
}
tenant
.walredo_mgrs
.lock()
.unwrap()
.insert(pg_version, Arc::new(TestRedoManager));
tenant.init_attach_timelines(timelines_to_load)?;
tenant.set_state(TenantState::Active {
background_jobs_running: false,

View File

@@ -216,7 +216,6 @@ impl TenantConf {
}
}
#[cfg(test)]
pub fn dummy_conf() -> Self {
TenantConf {
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,

View File

@@ -21,7 +21,6 @@ use crate::tenant::{
ephemeral_file::is_ephemeral_file, metadata::TimelineMetadata, Tenant, TenantState,
};
use crate::tenant_config::TenantConfOpt;
use crate::walredo::PostgresRedoManager;
use crate::TEMP_FILE_SUFFIX;
use utils::crashsafe::{self, path_with_suffix_extension};
@@ -154,7 +153,6 @@ pub fn attach_local_tenants(
let tenant = Arc::new(Tenant::new(
conf,
TenantConfOpt::default(),
Arc::new(PostgresRedoManager::new(conf, tenant_id)),
tenant_id,
remote_index.clone(),
conf.remote_storage_config.is_some(),
@@ -384,12 +382,10 @@ pub fn create_tenant(
Ok(None)
}
hash_map::Entry::Vacant(v) => {
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
create_tenant_files(conf, tenant_conf, tenant_id)?;
let tenant = Arc::new(Tenant::new(
conf,
tenant_conf,
wal_redo_manager,
tenant_id,
remote_index,
conf.remote_storage_config.is_some(),

View File

@@ -22,6 +22,7 @@ use byteorder::{ByteOrder, LittleEndian};
use bytes::{BufMut, Bytes, BytesMut};
use nix::poll::*;
use serde::Serialize;
use std::collections::VecDeque;
use std::fs::OpenOptions;
use std::io::prelude::*;
use std::io::{Error, ErrorKind};
@@ -31,7 +32,8 @@ use std::os::unix::prelude::CommandExt;
use std::path::PathBuf;
use std::process::Stdio;
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
use std::sync::Mutex;
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender, SyncSender};
use std::time::Duration;
use std::time::Instant;
use std::{fs, io};
@@ -41,7 +43,6 @@ use utils::{bin_ser::BeSer, id::TenantId, lsn::Lsn, nonblock::set_nonblock};
use crate::metrics::{
WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_RECORD_COUNTER, WAL_REDO_TIME,
WAL_REDO_WAIT_TIME,
};
use crate::pgdatadir_mapping::{key_to_rel_block, key_to_slru_block};
use crate::repository::Key;
@@ -57,6 +58,10 @@ use postgres_ffi::v14::nonrelfile_utils::{
};
use postgres_ffi::BLCKSZ;
const N_CHANNELS: usize = 16;
const CHANNEL_SIZE: usize = 1024 * 1024;
const ERR_BUF_SIZE: usize = 8192;
///
/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
///
@@ -92,16 +97,18 @@ pub trait WalRedoManager: Send + Sync {
///
/// This is the real implementation that uses a Postgres process to
/// perform WAL replay. Only one thread can use the process at a time,
/// that is controlled by the Mutex. In the future, we might want to
/// launch a pool of processes to allow concurrent replay of multiple
/// records.
/// perform WAL replay. It multiplexes requests from multiple threads
/// using `sender` channel and send them to the postgres wal-redo process
/// pipe by separate thread. Responses are returned through set of `receivers`
/// channels, used in round robin manner. Receiver thread is protected by mutex
/// to prevent it's usage by more than one thread
/// In the future, we might want to launch a pool of processes to allow concurrent
/// replay of multiple records.
///
pub struct PostgresRedoManager {
tenant_id: TenantId,
conf: &'static PageServerConf,
process: Mutex<Option<PostgresRedoProcess>>,
// mutiplexor pipe: use sync_channel to allow sharing sender by multiple threads
// and limit size of buffer
sender: SyncSender<(Sender<Bytes>, Vec<u8>)>,
}
/// Can this request be served by neon redo functions
@@ -149,7 +156,7 @@ impl WalRedoManager for PostgresRedoManager {
lsn: Lsn,
base_img: Option<Bytes>,
records: Vec<(Lsn, NeonWalRecord)>,
pg_version: u32,
_pg_version: u32,
) -> Result<Bytes, WalRedoError> {
if records.is_empty() {
error!("invalid WAL redo request with no records");
@@ -166,14 +173,7 @@ impl WalRedoManager for PostgresRedoManager {
let result = if batch_neon {
self.apply_batch_neon(key, lsn, img, &records[batch_start..i])
} else {
self.apply_batch_postgres(
key,
lsn,
img,
&records[batch_start..i],
self.conf.wal_redo_timeout,
pg_version,
)
self.apply_batch_postgres(key, lsn, img, &records[batch_start..i])
};
img = Some(result?);
@@ -185,14 +185,7 @@ impl WalRedoManager for PostgresRedoManager {
if batch_neon {
self.apply_batch_neon(key, lsn, img, &records[batch_start..])
} else {
self.apply_batch_postgres(
key,
lsn,
img,
&records[batch_start..],
self.conf.wal_redo_timeout,
pg_version,
)
self.apply_batch_postgres(key, lsn, img, &records[batch_start..])
}
}
}
@@ -201,85 +194,103 @@ impl PostgresRedoManager {
///
/// Create a new PostgresRedoManager.
///
pub fn new(conf: &'static PageServerConf, tenant_id: TenantId) -> PostgresRedoManager {
// The actual process is launched lazily, on first request.
PostgresRedoManager {
tenant_id,
conf,
process: Mutex::new(None),
pub fn new(
conf: &'static PageServerConf,
tenant_id: TenantId,
pg_version: u32,
) -> PostgresRedoManager {
#[allow(clippy::type_complexity)]
let (tx, rx): (
SyncSender<(Sender<Bytes>, Vec<u8>)>,
Receiver<(Sender<Bytes>, Vec<u8>)>,
) = mpsc::sync_channel(CHANNEL_SIZE);
let _proxy = std::thread::spawn(move || {
// This dirty hack is used for lazy spawning if walredo process.
// Otherwise initdb called in bootstrap will conflict with initdb called by main pageserver process
let mut input = Some(rx.recv().unwrap());
while let Ok(mut proc) = PostgresRedoProcess::launch(conf, tenant_id, pg_version) {
loop {
if let Some((sender, data)) = input.take() {
if proc.send(sender, data).is_err() {
break;
}
} else {
let (sender, data) = rx.recv().unwrap();
if proc.send(sender, data).is_err() {
break;
}
}
while let Ok((sender, data)) = rx.try_recv() {
if proc.send(sender, data).is_err() {
break;
}
}
if proc.receive().is_err() {
break;
}
}
}
panic!("Failed to launch wal-redo postgres");
});
PostgresRedoManager { sender: tx }
}
fn apply_wal_records(
&self,
tag: BufferTag,
base_img: Option<Bytes>,
records: &[(Lsn, NeonWalRecord)],
) -> Result<Bytes, WalRedoError> {
// Serialize all the messages to send the WAL redo process first.
//
// This could be problematic if there are millions of records to replay,
// but in practice the number of records is usually so small that it doesn't
// matter, and it's better to keep this code simple.
let mut writebuf: Vec<u8> = Vec::new();
build_begin_redo_for_block_msg(tag, &mut writebuf);
if let Some(img) = base_img {
build_push_page_msg(tag, &img, &mut writebuf);
}
for (lsn, rec) in records.iter() {
if let NeonWalRecord::Postgres {
will_init: _,
rec: postgres_rec,
} = rec
{
build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
} else {
return Err(WalRedoError::InvalidRecord);
}
}
build_get_page_msg(tag, &mut writebuf);
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
WAL_REDO_RECORDS_HISTOGRAM.observe(records.len() as f64);
WAL_REDO_BYTES_HISTOGRAM.observe(writebuf.len() as f64);
let (tx, rx) = mpsc::channel();
self.sender.send((tx, writebuf)).unwrap();
Ok(rx.recv().unwrap())
}
///
/// Process one request for WAL redo using wal-redo postgres
// Apply given WAL records ('records') over an old page image. Returns
// new page image.
///
fn apply_batch_postgres(
&self,
key: Key,
lsn: Lsn,
_lsn: Lsn,
base_img: Option<Bytes>,
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
pg_version: u32,
) -> Result<Bytes, WalRedoError> {
let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?;
let start_time = Instant::now();
let mut process_guard = self.process.lock().unwrap();
let lock_time = Instant::now();
// launch the WAL redo process on first use
if process_guard.is_none() {
let p = PostgresRedoProcess::launch(self.conf, self.tenant_id, pg_version)?;
*process_guard = Some(p);
}
let process = process_guard.as_mut().unwrap();
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
// Relational WAL records are applied using wal-redo-postgres
let buf_tag = BufferTag { rel, blknum };
let result = process
.apply_wal_records(buf_tag, base_img, records, wal_redo_timeout)
.map_err(WalRedoError::IoError);
let result = self.apply_wal_records(buf_tag, base_img, records);
let end_time = Instant::now();
let duration = end_time.duration_since(lock_time);
let len = records.len();
let nbytes = records.iter().fold(0, |acumulator, record| {
acumulator
+ match &record.1 {
NeonWalRecord::Postgres { rec, .. } => rec.len(),
_ => unreachable!("Only PostgreSQL records are accepted in this batch"),
}
});
WAL_REDO_TIME.observe(duration.as_secs_f64());
WAL_REDO_RECORDS_HISTOGRAM.observe(len as f64);
WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64);
debug!(
"postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}",
len,
nbytes,
duration.as_micros(),
lsn
);
// If something went wrong, don't try to reuse the process. Kill it, and
// next request will launch a new one.
if result.is_err() {
error!(
"error applying {} WAL records ({} bytes) to reconstruct page image at LSN {}",
records.len(),
nbytes,
lsn
);
let process = process_guard.take().unwrap();
process.kill();
}
WAL_REDO_TIME.observe(end_time.duration_since(start_time).as_secs_f64());
result
}
@@ -582,17 +593,163 @@ impl<C: CommandExt> CloseFileDescriptors for C {
///
struct PostgresRedoProcess {
tenant_id: TenantId,
child: NoLeakChild,
_child: NoLeakChild,
stdin: ChildStdin,
stdout: ChildStdout,
stderr: ChildStderr,
wal_redo_timeout: Duration,
// Double ended queue for buffered response senders
resp_deque: VecDeque<Sender<Bytes>>,
// Reconstructed page
page: [u8; BLCKSZ as usize],
// Position in reconstructed page buufer
page_pos: usize,
// Pool file descriptors
poll_fds: [PollFd; 3],
}
impl PostgresRedoProcess {
#[instrument(skip_all,fields(tenant_id=%self.tenant_id))]
fn receive(&mut self) -> Result<(), Error> {
while !self.resp_deque.is_empty() {
let n = loop {
match nix::poll::poll(
&mut self.poll_fds[0..2],
self.wal_redo_timeout.as_millis() as i32,
) {
Err(e) if e == nix::errno::Errno::EINTR => continue,
res => break res,
}
}?;
if n == 0 {
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
}
// If we have some messages in stderr, forward them to the log.
let err_revents = self.poll_fds[1].revents().unwrap();
if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
let mut errbuf: [u8; ERR_BUF_SIZE] = [0; ERR_BUF_SIZE];
let n = self.stderr.read(&mut errbuf)?;
// The message might not be split correctly into lines here. But this is
// good enough, the important thing is to get the message to the log.
if n > 0 {
error!(
"wal-redo-postgres: {}",
String::from_utf8_lossy(&errbuf[0..n])
);
}
} else if err_revents.contains(PollFlags::POLLHUP) {
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stderr unexpectedly",
));
}
// If we have some data in stdout, read it to the result buffer.
let out_revents = self.poll_fds[0].revents().unwrap();
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
self.page_pos += self.stdout.read(&mut self.page[self.page_pos..])?;
if self.page_pos == BLCKSZ as usize {
if let Some(sender) = self.resp_deque.pop_front() {
sender.send(Bytes::copy_from_slice(&self.page)).unwrap();
} else {
return Err(Error::new(
ErrorKind::BrokenPipe,
"Malformed output of walredo process",
));
}
self.page_pos = 0;
}
} else if out_revents.contains(PollFlags::POLLHUP) {
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdout unexpectedly",
));
}
}
Ok(())
}
#[instrument(skip_all,fields(tenant_id=%self.tenant_id))]
fn send(&mut self, sender: Sender<Bytes>, data: Vec<u8>) -> Result<(), Error> {
let mut written = 0usize;
let data_len = data.len();
self.resp_deque.push_back(sender);
while written < data_len {
let n = loop {
match nix::poll::poll(
&mut self.poll_fds[0..3],
self.wal_redo_timeout.as_millis() as i32,
) {
Err(e) if e == nix::errno::Errno::EINTR => continue,
res => break res,
}
}?;
if n == 0 {
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
}
// If we have some messages in stderr, forward them to the log.
let err_revents = self.poll_fds[1].revents().unwrap();
if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
let mut errbuf: [u8; ERR_BUF_SIZE] = [0; ERR_BUF_SIZE];
let n = self.stderr.read(&mut errbuf)?;
// The message might not be split correctly into lines here. But this is
// good enough, the important thing is to get the message to the log.
if n > 0 {
panic!(
"wal-redo-postgres: {}",
String::from_utf8_lossy(&errbuf[0..n])
);
}
} else if err_revents.contains(PollFlags::POLLHUP) {
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stderr unexpectedly",
));
}
let in_revents = self.poll_fds[2].revents().unwrap();
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
written += self.stdin.write(&data[written..data_len])?;
} else if in_revents.contains(PollFlags::POLLHUP) {
// We still have more data to write, but the process closed the pipe.
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdin unexpectedly",
));
}
// If we have some data in stdout, read it to the result buffer.
let out_revents = self.poll_fds[0].revents().unwrap();
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
self.page_pos += self.stdout.read(&mut self.page[self.page_pos..])?;
if self.page_pos == BLCKSZ as usize {
if let Some(sender) = self.resp_deque.pop_front() {
sender.send(Bytes::copy_from_slice(&self.page)).unwrap();
} else {
return Err(Error::new(
ErrorKind::BrokenPipe,
"Malformed output of walredo process",
));
}
self.page_pos = 0;
}
} else if out_revents.contains(PollFlags::POLLHUP) {
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdout unexpectedly",
));
}
}
Ok(())
}
//
// Start postgres binary in special WAL redo mode.
//
#[instrument(skip_all,fields(tenant_id=%tenant_id, pg_version=pg_version))]
#[instrument(skip_all,fields(tenant_id=%tenant_id))]
fn launch(
conf: &PageServerConf,
tenant_id: TenantId,
@@ -605,7 +762,6 @@ impl PostgresRedoProcess {
conf.tenant_path(&tenant_id).join("wal-redo-datadir"),
TEMP_FILE_SUFFIX,
);
// Create empty data directory for wal-redo postgres, deleting old one first.
if datadir.exists() {
info!(
@@ -627,9 +783,14 @@ impl PostgresRedoProcess {
)
})?;
info!("running initdb in {}", datadir.display());
info!(
"running initdb in {} for pg_verson={}",
datadir.display(),
pg_version
);
let initdb = Command::new(pg_bin_dir_path.join("initdb"))
.args(&["-D", &datadir.to_string_lossy()])
.arg("-n")
.arg("-N")
.env_clear()
.env("LD_LIBRARY_PATH", &pg_lib_dir_path)
@@ -709,152 +870,25 @@ impl PostgresRedoProcess {
// all fallible operations post-spawn are complete, so get rid of the guard
let child = scopeguard::ScopeGuard::into_inner(child);
let poll_fds = [
PollFd::new(stdout.as_raw_fd(), PollFlags::POLLIN),
PollFd::new(stderr.as_raw_fd(), PollFlags::POLLIN),
PollFd::new(stdin.as_raw_fd(), PollFlags::POLLOUT),
];
Ok(PostgresRedoProcess {
tenant_id,
child,
_child: child,
stdin,
stdout,
stderr,
wal_redo_timeout: conf.wal_redo_timeout,
resp_deque: VecDeque::with_capacity(N_CHANNELS),
page: [0u8; BLCKSZ as usize],
page_pos: 0,
poll_fds,
})
}
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%self.child.id()))]
fn kill(self) {
self.child.kill_and_wait();
}
//
// Apply given WAL records ('records') over an old page image. Returns
// new page image.
//
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%self.child.id()))]
fn apply_wal_records(
&mut self,
tag: BufferTag,
base_img: Option<Bytes>,
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
) -> Result<Bytes, std::io::Error> {
// Serialize all the messages to send the WAL redo process first.
//
// This could be problematic if there are millions of records to replay,
// but in practice the number of records is usually so small that it doesn't
// matter, and it's better to keep this code simple.
//
// Most requests start with a before-image with BLCKSZ bytes, followed by
// by some other WAL records. Start with a buffer that can hold that
// comfortably.
let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
build_begin_redo_for_block_msg(tag, &mut writebuf);
if let Some(img) = base_img {
build_push_page_msg(tag, &img, &mut writebuf);
}
for (lsn, rec) in records.iter() {
if let NeonWalRecord::Postgres {
will_init: _,
rec: postgres_rec,
} = rec
{
build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
} else {
return Err(Error::new(
ErrorKind::Other,
"tried to pass neon wal record to postgres WAL redo",
));
}
}
build_get_page_msg(tag, &mut writebuf);
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
// The input is now in 'writebuf'. Do a blind write first, writing as much as
// we can, before calling poll(). That skips one call to poll() if the stdin is
// already available for writing, which it almost certainly is because the
// process is idle.
let mut nwrite = self.stdin.write(&writebuf)?;
// We expect the WAL redo process to respond with an 8k page image. We read it
// into this buffer.
let mut resultbuf = vec![0; BLCKSZ.into()];
let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
// Prepare for calling poll()
let mut pollfds = [
PollFd::new(self.stdout.as_raw_fd(), PollFlags::POLLIN),
PollFd::new(self.stderr.as_raw_fd(), PollFlags::POLLIN),
PollFd::new(self.stdin.as_raw_fd(), PollFlags::POLLOUT),
];
// We do three things simultaneously: send the old base image and WAL records to
// the child process's stdin, read the result from child's stdout, and forward any logging
// information that the child writes to its stderr to the page server's log.
while nresult < BLCKSZ.into() {
// If we have more data to write, wake up if 'stdin' becomes writeable or
// we have data to read. Otherwise only wake up if there's data to read.
let nfds = if nwrite < writebuf.len() { 3 } else { 2 };
let n = loop {
match nix::poll::poll(&mut pollfds[0..nfds], wal_redo_timeout.as_millis() as i32) {
Err(e) if e == nix::errno::Errno::EINTR => continue,
res => break res,
}
}?;
if n == 0 {
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
}
// If we have some messages in stderr, forward them to the log.
let err_revents = pollfds[1].revents().unwrap();
if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
let mut errbuf: [u8; 16384] = [0; 16384];
let n = self.stderr.read(&mut errbuf)?;
// The message might not be split correctly into lines here. But this is
// good enough, the important thing is to get the message to the log.
if n > 0 {
error!(
"wal-redo-postgres: {}",
String::from_utf8_lossy(&errbuf[0..n])
);
// To make sure we capture all log from the process if it fails, keep
// reading from the stderr, before checking the stdout.
continue;
}
} else if err_revents.contains(PollFlags::POLLHUP) {
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stderr unexpectedly",
));
}
// If we have more data to write and 'stdin' is writeable, do write.
if nwrite < writebuf.len() {
let in_revents = pollfds[2].revents().unwrap();
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
nwrite += self.stdin.write(&writebuf[nwrite..])?;
} else if in_revents.contains(PollFlags::POLLHUP) {
// We still have more data to write, but the process closed the pipe.
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdin unexpectedly",
));
}
}
// If we have some data in stdout, read it to the result buffer.
let out_revents = pollfds[0].revents().unwrap();
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
nresult += self.stdout.read(&mut resultbuf[nresult..])?;
} else if out_revents.contains(PollFlags::POLLHUP) {
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdout unexpectedly",
));
}
}
Ok(Bytes::from(resultbuf))
}
}
/// Wrapper type around `std::process::Child` which guarantees that the child