mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-03 06:10:36 +00:00
Compare commits
16 Commits
sk-shardin
...
jk/walredo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6a706af1e5 | ||
|
|
2a646ed989 | ||
|
|
9a8dbd3fe6 | ||
|
|
fa73b8ebe5 | ||
|
|
ecb4539230 | ||
|
|
74e77d8fe0 | ||
|
|
aa40544231 | ||
|
|
fa69ee3f90 | ||
|
|
e6b484dbea | ||
|
|
32e0461863 | ||
|
|
a30cdea71f | ||
|
|
9ecc54e30b | ||
|
|
db2b14de75 | ||
|
|
3df753249a | ||
|
|
915144ac96 | ||
|
|
142e347c81 |
@@ -76,3 +76,7 @@ tempfile = "3.2"
|
||||
[[bench]]
|
||||
name = "bench_layer_map"
|
||||
harness = false
|
||||
|
||||
[[bench]]
|
||||
name = "bench_walredo"
|
||||
harness = false
|
||||
|
||||
465
pageserver/benches/bench_walredo.rs
Normal file
465
pageserver/benches/bench_walredo.rs
Normal file
File diff suppressed because one or more lines are too long
@@ -614,8 +614,9 @@ impl PageServerConf {
|
||||
PathBuf::from(format!("../tmp_check/test_{test_name}"))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn dummy_conf(repo_dir: PathBuf) -> Self {
|
||||
let pg_distrib_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../pg_install");
|
||||
|
||||
PageServerConf {
|
||||
id: NodeId(0),
|
||||
wait_lsn_timeout: Duration::from_secs(60),
|
||||
@@ -626,7 +627,7 @@ impl PageServerConf {
|
||||
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
|
||||
superuser: "cloud_admin".to_string(),
|
||||
workdir: repo_dir,
|
||||
pg_distrib_dir: PathBuf::new(),
|
||||
pg_distrib_dir,
|
||||
auth_type: AuthType::Trust,
|
||||
auth_validation_public_key_path: None,
|
||||
remote_storage_config: None,
|
||||
|
||||
@@ -50,7 +50,7 @@ use crate::storage_sync::index::RemoteIndex;
|
||||
use crate::task_mgr;
|
||||
use crate::tenant_config::TenantConfOpt;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::walredo::WalRedoManager;
|
||||
use crate::walredo::{PostgresRedoManager, WalRedoManager};
|
||||
use crate::{CheckpointConfig, TEMP_FILE_SUFFIX};
|
||||
pub use pageserver_api::models::TenantState;
|
||||
|
||||
@@ -119,7 +119,7 @@ pub struct Tenant {
|
||||
// with timelines, which in turn may cause dropping replication connection, expiration of wait_for_lsn
|
||||
// timeout...
|
||||
gc_cs: Mutex<()>,
|
||||
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
|
||||
walredo_mgrs: Mutex<HashMap<u32, Arc<dyn WalRedoManager + Send + Sync>>>,
|
||||
|
||||
// provides access to timeline data sitting in the remote storage
|
||||
// supposed to be used for retrieval of remote consistent lsn in walreceiver
|
||||
@@ -831,6 +831,19 @@ impl Tenant {
|
||||
}
|
||||
|
||||
let pg_version = new_metadata.pg_version();
|
||||
let walredo_mgr = self
|
||||
.walredo_mgrs
|
||||
.lock()
|
||||
.unwrap()
|
||||
.entry(pg_version)
|
||||
.or_insert_with(|| {
|
||||
Arc::new(PostgresRedoManager::new(
|
||||
self.conf,
|
||||
self.tenant_id,
|
||||
pg_version,
|
||||
))
|
||||
})
|
||||
.clone();
|
||||
Ok(Timeline::new(
|
||||
self.conf,
|
||||
Arc::clone(&self.tenant_conf),
|
||||
@@ -838,7 +851,7 @@ impl Tenant {
|
||||
ancestor,
|
||||
new_timeline_id,
|
||||
self.tenant_id,
|
||||
Arc::clone(&self.walredo_mgr),
|
||||
walredo_mgr,
|
||||
self.upload_layers,
|
||||
pg_version,
|
||||
))
|
||||
@@ -847,7 +860,6 @@ impl Tenant {
|
||||
pub(super) fn new(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: TenantConfOpt,
|
||||
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
|
||||
tenant_id: TenantId,
|
||||
remote_index: RemoteIndex,
|
||||
upload_layers: bool,
|
||||
@@ -857,9 +869,9 @@ impl Tenant {
|
||||
tenant_id,
|
||||
conf,
|
||||
tenant_conf: Arc::new(RwLock::new(tenant_conf)),
|
||||
walredo_mgrs: Mutex::new(HashMap::new()),
|
||||
timelines: Mutex::new(HashMap::new()),
|
||||
gc_cs: Mutex::new(()),
|
||||
walredo_mgr,
|
||||
remote_index,
|
||||
upload_layers,
|
||||
state,
|
||||
@@ -1731,18 +1743,16 @@ pub mod harness {
|
||||
}
|
||||
|
||||
pub fn try_load(&self) -> anyhow::Result<Tenant> {
|
||||
let walredo_mgr = Arc::new(TestRedoManager);
|
||||
|
||||
let tenant = Tenant::new(
|
||||
self.conf,
|
||||
TenantConfOpt::from(self.tenant_conf),
|
||||
walredo_mgr,
|
||||
self.tenant_id,
|
||||
RemoteIndex::default(),
|
||||
false,
|
||||
);
|
||||
// populate tenant with locally available timelines
|
||||
let mut timelines_to_load = HashMap::new();
|
||||
let mut pg_version = 0u32;
|
||||
for timeline_dir_entry in fs::read_dir(self.conf.timelines_path(&self.tenant_id))
|
||||
.expect("should be able to read timelines dir")
|
||||
{
|
||||
@@ -1755,8 +1765,14 @@ pub mod harness {
|
||||
.parse()?;
|
||||
|
||||
let timeline_metadata = load_metadata(self.conf, timeline_id, self.tenant_id)?;
|
||||
pg_version = timeline_metadata.pg_version();
|
||||
timelines_to_load.insert(timeline_id, timeline_metadata);
|
||||
}
|
||||
tenant
|
||||
.walredo_mgrs
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(pg_version, Arc::new(TestRedoManager));
|
||||
tenant.init_attach_timelines(timelines_to_load)?;
|
||||
tenant.set_state(TenantState::Active {
|
||||
background_jobs_running: false,
|
||||
|
||||
@@ -216,7 +216,6 @@ impl TenantConf {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn dummy_conf() -> Self {
|
||||
TenantConf {
|
||||
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
|
||||
|
||||
@@ -21,7 +21,6 @@ use crate::tenant::{
|
||||
ephemeral_file::is_ephemeral_file, metadata::TimelineMetadata, Tenant, TenantState,
|
||||
};
|
||||
use crate::tenant_config::TenantConfOpt;
|
||||
use crate::walredo::PostgresRedoManager;
|
||||
use crate::TEMP_FILE_SUFFIX;
|
||||
|
||||
use utils::crashsafe::{self, path_with_suffix_extension};
|
||||
@@ -154,7 +153,6 @@ pub fn attach_local_tenants(
|
||||
let tenant = Arc::new(Tenant::new(
|
||||
conf,
|
||||
TenantConfOpt::default(),
|
||||
Arc::new(PostgresRedoManager::new(conf, tenant_id)),
|
||||
tenant_id,
|
||||
remote_index.clone(),
|
||||
conf.remote_storage_config.is_some(),
|
||||
@@ -384,12 +382,10 @@ pub fn create_tenant(
|
||||
Ok(None)
|
||||
}
|
||||
hash_map::Entry::Vacant(v) => {
|
||||
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
|
||||
create_tenant_files(conf, tenant_conf, tenant_id)?;
|
||||
let tenant = Arc::new(Tenant::new(
|
||||
conf,
|
||||
tenant_conf,
|
||||
wal_redo_manager,
|
||||
tenant_id,
|
||||
remote_index,
|
||||
conf.remote_storage_config.is_some(),
|
||||
|
||||
@@ -22,6 +22,7 @@ use byteorder::{ByteOrder, LittleEndian};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use nix::poll::*;
|
||||
use serde::Serialize;
|
||||
use std::collections::VecDeque;
|
||||
use std::fs::OpenOptions;
|
||||
use std::io::prelude::*;
|
||||
use std::io::{Error, ErrorKind};
|
||||
@@ -31,7 +32,8 @@ use std::os::unix::prelude::CommandExt;
|
||||
use std::path::PathBuf;
|
||||
use std::process::Stdio;
|
||||
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
|
||||
use std::sync::Mutex;
|
||||
use std::sync::mpsc;
|
||||
use std::sync::mpsc::{Receiver, Sender, SyncSender};
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use std::{fs, io};
|
||||
@@ -41,7 +43,6 @@ use utils::{bin_ser::BeSer, id::TenantId, lsn::Lsn, nonblock::set_nonblock};
|
||||
|
||||
use crate::metrics::{
|
||||
WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_RECORD_COUNTER, WAL_REDO_TIME,
|
||||
WAL_REDO_WAIT_TIME,
|
||||
};
|
||||
use crate::pgdatadir_mapping::{key_to_rel_block, key_to_slru_block};
|
||||
use crate::repository::Key;
|
||||
@@ -57,6 +58,10 @@ use postgres_ffi::v14::nonrelfile_utils::{
|
||||
};
|
||||
use postgres_ffi::BLCKSZ;
|
||||
|
||||
const N_CHANNELS: usize = 16;
|
||||
const CHANNEL_SIZE: usize = 1024 * 1024;
|
||||
const ERR_BUF_SIZE: usize = 8192;
|
||||
|
||||
///
|
||||
/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
|
||||
///
|
||||
@@ -92,16 +97,18 @@ pub trait WalRedoManager: Send + Sync {
|
||||
|
||||
///
|
||||
/// This is the real implementation that uses a Postgres process to
|
||||
/// perform WAL replay. Only one thread can use the process at a time,
|
||||
/// that is controlled by the Mutex. In the future, we might want to
|
||||
/// launch a pool of processes to allow concurrent replay of multiple
|
||||
/// records.
|
||||
/// perform WAL replay. It multiplexes requests from multiple threads
|
||||
/// using `sender` channel and send them to the postgres wal-redo process
|
||||
/// pipe by separate thread. Responses are returned through set of `receivers`
|
||||
/// channels, used in round robin manner. Receiver thread is protected by mutex
|
||||
/// to prevent it's usage by more than one thread
|
||||
/// In the future, we might want to launch a pool of processes to allow concurrent
|
||||
/// replay of multiple records.
|
||||
///
|
||||
pub struct PostgresRedoManager {
|
||||
tenant_id: TenantId,
|
||||
conf: &'static PageServerConf,
|
||||
|
||||
process: Mutex<Option<PostgresRedoProcess>>,
|
||||
// mutiplexor pipe: use sync_channel to allow sharing sender by multiple threads
|
||||
// and limit size of buffer
|
||||
sender: SyncSender<(Sender<Bytes>, Vec<u8>)>,
|
||||
}
|
||||
|
||||
/// Can this request be served by neon redo functions
|
||||
@@ -149,7 +156,7 @@ impl WalRedoManager for PostgresRedoManager {
|
||||
lsn: Lsn,
|
||||
base_img: Option<Bytes>,
|
||||
records: Vec<(Lsn, NeonWalRecord)>,
|
||||
pg_version: u32,
|
||||
_pg_version: u32,
|
||||
) -> Result<Bytes, WalRedoError> {
|
||||
if records.is_empty() {
|
||||
error!("invalid WAL redo request with no records");
|
||||
@@ -166,14 +173,7 @@ impl WalRedoManager for PostgresRedoManager {
|
||||
let result = if batch_neon {
|
||||
self.apply_batch_neon(key, lsn, img, &records[batch_start..i])
|
||||
} else {
|
||||
self.apply_batch_postgres(
|
||||
key,
|
||||
lsn,
|
||||
img,
|
||||
&records[batch_start..i],
|
||||
self.conf.wal_redo_timeout,
|
||||
pg_version,
|
||||
)
|
||||
self.apply_batch_postgres(key, lsn, img, &records[batch_start..i])
|
||||
};
|
||||
img = Some(result?);
|
||||
|
||||
@@ -185,14 +185,7 @@ impl WalRedoManager for PostgresRedoManager {
|
||||
if batch_neon {
|
||||
self.apply_batch_neon(key, lsn, img, &records[batch_start..])
|
||||
} else {
|
||||
self.apply_batch_postgres(
|
||||
key,
|
||||
lsn,
|
||||
img,
|
||||
&records[batch_start..],
|
||||
self.conf.wal_redo_timeout,
|
||||
pg_version,
|
||||
)
|
||||
self.apply_batch_postgres(key, lsn, img, &records[batch_start..])
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -201,85 +194,103 @@ impl PostgresRedoManager {
|
||||
///
|
||||
/// Create a new PostgresRedoManager.
|
||||
///
|
||||
pub fn new(conf: &'static PageServerConf, tenant_id: TenantId) -> PostgresRedoManager {
|
||||
// The actual process is launched lazily, on first request.
|
||||
PostgresRedoManager {
|
||||
tenant_id,
|
||||
conf,
|
||||
process: Mutex::new(None),
|
||||
pub fn new(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
pg_version: u32,
|
||||
) -> PostgresRedoManager {
|
||||
#[allow(clippy::type_complexity)]
|
||||
let (tx, rx): (
|
||||
SyncSender<(Sender<Bytes>, Vec<u8>)>,
|
||||
Receiver<(Sender<Bytes>, Vec<u8>)>,
|
||||
) = mpsc::sync_channel(CHANNEL_SIZE);
|
||||
let _proxy = std::thread::spawn(move || {
|
||||
// This dirty hack is used for lazy spawning if walredo process.
|
||||
// Otherwise initdb called in bootstrap will conflict with initdb called by main pageserver process
|
||||
let mut input = Some(rx.recv().unwrap());
|
||||
while let Ok(mut proc) = PostgresRedoProcess::launch(conf, tenant_id, pg_version) {
|
||||
loop {
|
||||
if let Some((sender, data)) = input.take() {
|
||||
if proc.send(sender, data).is_err() {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
let (sender, data) = rx.recv().unwrap();
|
||||
if proc.send(sender, data).is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
while let Ok((sender, data)) = rx.try_recv() {
|
||||
if proc.send(sender, data).is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if proc.receive().is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
panic!("Failed to launch wal-redo postgres");
|
||||
});
|
||||
|
||||
PostgresRedoManager { sender: tx }
|
||||
}
|
||||
|
||||
fn apply_wal_records(
|
||||
&self,
|
||||
tag: BufferTag,
|
||||
base_img: Option<Bytes>,
|
||||
records: &[(Lsn, NeonWalRecord)],
|
||||
) -> Result<Bytes, WalRedoError> {
|
||||
// Serialize all the messages to send the WAL redo process first.
|
||||
//
|
||||
// This could be problematic if there are millions of records to replay,
|
||||
// but in practice the number of records is usually so small that it doesn't
|
||||
// matter, and it's better to keep this code simple.
|
||||
let mut writebuf: Vec<u8> = Vec::new();
|
||||
build_begin_redo_for_block_msg(tag, &mut writebuf);
|
||||
if let Some(img) = base_img {
|
||||
build_push_page_msg(tag, &img, &mut writebuf);
|
||||
}
|
||||
for (lsn, rec) in records.iter() {
|
||||
if let NeonWalRecord::Postgres {
|
||||
will_init: _,
|
||||
rec: postgres_rec,
|
||||
} = rec
|
||||
{
|
||||
build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
|
||||
} else {
|
||||
return Err(WalRedoError::InvalidRecord);
|
||||
}
|
||||
}
|
||||
build_get_page_msg(tag, &mut writebuf);
|
||||
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
|
||||
|
||||
WAL_REDO_RECORDS_HISTOGRAM.observe(records.len() as f64);
|
||||
WAL_REDO_BYTES_HISTOGRAM.observe(writebuf.len() as f64);
|
||||
let (tx, rx) = mpsc::channel();
|
||||
self.sender.send((tx, writebuf)).unwrap();
|
||||
Ok(rx.recv().unwrap())
|
||||
}
|
||||
|
||||
///
|
||||
/// Process one request for WAL redo using wal-redo postgres
|
||||
// Apply given WAL records ('records') over an old page image. Returns
|
||||
// new page image.
|
||||
///
|
||||
fn apply_batch_postgres(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
_lsn: Lsn,
|
||||
base_img: Option<Bytes>,
|
||||
records: &[(Lsn, NeonWalRecord)],
|
||||
wal_redo_timeout: Duration,
|
||||
pg_version: u32,
|
||||
) -> Result<Bytes, WalRedoError> {
|
||||
let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?;
|
||||
|
||||
let start_time = Instant::now();
|
||||
|
||||
let mut process_guard = self.process.lock().unwrap();
|
||||
let lock_time = Instant::now();
|
||||
|
||||
// launch the WAL redo process on first use
|
||||
if process_guard.is_none() {
|
||||
let p = PostgresRedoProcess::launch(self.conf, self.tenant_id, pg_version)?;
|
||||
*process_guard = Some(p);
|
||||
}
|
||||
let process = process_guard.as_mut().unwrap();
|
||||
|
||||
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
|
||||
|
||||
// Relational WAL records are applied using wal-redo-postgres
|
||||
let buf_tag = BufferTag { rel, blknum };
|
||||
let result = process
|
||||
.apply_wal_records(buf_tag, base_img, records, wal_redo_timeout)
|
||||
.map_err(WalRedoError::IoError);
|
||||
|
||||
let result = self.apply_wal_records(buf_tag, base_img, records);
|
||||
let end_time = Instant::now();
|
||||
let duration = end_time.duration_since(lock_time);
|
||||
|
||||
let len = records.len();
|
||||
let nbytes = records.iter().fold(0, |acumulator, record| {
|
||||
acumulator
|
||||
+ match &record.1 {
|
||||
NeonWalRecord::Postgres { rec, .. } => rec.len(),
|
||||
_ => unreachable!("Only PostgreSQL records are accepted in this batch"),
|
||||
}
|
||||
});
|
||||
|
||||
WAL_REDO_TIME.observe(duration.as_secs_f64());
|
||||
WAL_REDO_RECORDS_HISTOGRAM.observe(len as f64);
|
||||
WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64);
|
||||
|
||||
debug!(
|
||||
"postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}",
|
||||
len,
|
||||
nbytes,
|
||||
duration.as_micros(),
|
||||
lsn
|
||||
);
|
||||
|
||||
// If something went wrong, don't try to reuse the process. Kill it, and
|
||||
// next request will launch a new one.
|
||||
if result.is_err() {
|
||||
error!(
|
||||
"error applying {} WAL records ({} bytes) to reconstruct page image at LSN {}",
|
||||
records.len(),
|
||||
nbytes,
|
||||
lsn
|
||||
);
|
||||
let process = process_guard.take().unwrap();
|
||||
process.kill();
|
||||
}
|
||||
WAL_REDO_TIME.observe(end_time.duration_since(start_time).as_secs_f64());
|
||||
result
|
||||
}
|
||||
|
||||
@@ -582,17 +593,163 @@ impl<C: CommandExt> CloseFileDescriptors for C {
|
||||
///
|
||||
struct PostgresRedoProcess {
|
||||
tenant_id: TenantId,
|
||||
child: NoLeakChild,
|
||||
_child: NoLeakChild,
|
||||
stdin: ChildStdin,
|
||||
stdout: ChildStdout,
|
||||
stderr: ChildStderr,
|
||||
wal_redo_timeout: Duration,
|
||||
// Double ended queue for buffered response senders
|
||||
resp_deque: VecDeque<Sender<Bytes>>,
|
||||
// Reconstructed page
|
||||
page: [u8; BLCKSZ as usize],
|
||||
// Position in reconstructed page buufer
|
||||
page_pos: usize,
|
||||
// Pool file descriptors
|
||||
poll_fds: [PollFd; 3],
|
||||
}
|
||||
|
||||
impl PostgresRedoProcess {
|
||||
#[instrument(skip_all,fields(tenant_id=%self.tenant_id))]
|
||||
fn receive(&mut self) -> Result<(), Error> {
|
||||
while !self.resp_deque.is_empty() {
|
||||
let n = loop {
|
||||
match nix::poll::poll(
|
||||
&mut self.poll_fds[0..2],
|
||||
self.wal_redo_timeout.as_millis() as i32,
|
||||
) {
|
||||
Err(e) if e == nix::errno::Errno::EINTR => continue,
|
||||
res => break res,
|
||||
}
|
||||
}?;
|
||||
|
||||
if n == 0 {
|
||||
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
|
||||
}
|
||||
// If we have some messages in stderr, forward them to the log.
|
||||
let err_revents = self.poll_fds[1].revents().unwrap();
|
||||
if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
|
||||
let mut errbuf: [u8; ERR_BUF_SIZE] = [0; ERR_BUF_SIZE];
|
||||
let n = self.stderr.read(&mut errbuf)?;
|
||||
|
||||
// The message might not be split correctly into lines here. But this is
|
||||
// good enough, the important thing is to get the message to the log.
|
||||
if n > 0 {
|
||||
error!(
|
||||
"wal-redo-postgres: {}",
|
||||
String::from_utf8_lossy(&errbuf[0..n])
|
||||
);
|
||||
}
|
||||
} else if err_revents.contains(PollFlags::POLLHUP) {
|
||||
return Err(Error::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"WAL redo process closed its stderr unexpectedly",
|
||||
));
|
||||
}
|
||||
// If we have some data in stdout, read it to the result buffer.
|
||||
let out_revents = self.poll_fds[0].revents().unwrap();
|
||||
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
|
||||
self.page_pos += self.stdout.read(&mut self.page[self.page_pos..])?;
|
||||
if self.page_pos == BLCKSZ as usize {
|
||||
if let Some(sender) = self.resp_deque.pop_front() {
|
||||
sender.send(Bytes::copy_from_slice(&self.page)).unwrap();
|
||||
} else {
|
||||
return Err(Error::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"Malformed output of walredo process",
|
||||
));
|
||||
}
|
||||
self.page_pos = 0;
|
||||
}
|
||||
} else if out_revents.contains(PollFlags::POLLHUP) {
|
||||
return Err(Error::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"WAL redo process closed its stdout unexpectedly",
|
||||
));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all,fields(tenant_id=%self.tenant_id))]
|
||||
fn send(&mut self, sender: Sender<Bytes>, data: Vec<u8>) -> Result<(), Error> {
|
||||
let mut written = 0usize;
|
||||
let data_len = data.len();
|
||||
self.resp_deque.push_back(sender);
|
||||
while written < data_len {
|
||||
let n = loop {
|
||||
match nix::poll::poll(
|
||||
&mut self.poll_fds[0..3],
|
||||
self.wal_redo_timeout.as_millis() as i32,
|
||||
) {
|
||||
Err(e) if e == nix::errno::Errno::EINTR => continue,
|
||||
res => break res,
|
||||
}
|
||||
}?;
|
||||
|
||||
if n == 0 {
|
||||
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
|
||||
}
|
||||
|
||||
// If we have some messages in stderr, forward them to the log.
|
||||
let err_revents = self.poll_fds[1].revents().unwrap();
|
||||
if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
|
||||
let mut errbuf: [u8; ERR_BUF_SIZE] = [0; ERR_BUF_SIZE];
|
||||
let n = self.stderr.read(&mut errbuf)?;
|
||||
|
||||
// The message might not be split correctly into lines here. But this is
|
||||
// good enough, the important thing is to get the message to the log.
|
||||
if n > 0 {
|
||||
panic!(
|
||||
"wal-redo-postgres: {}",
|
||||
String::from_utf8_lossy(&errbuf[0..n])
|
||||
);
|
||||
}
|
||||
} else if err_revents.contains(PollFlags::POLLHUP) {
|
||||
return Err(Error::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"WAL redo process closed its stderr unexpectedly",
|
||||
));
|
||||
}
|
||||
|
||||
let in_revents = self.poll_fds[2].revents().unwrap();
|
||||
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
|
||||
written += self.stdin.write(&data[written..data_len])?;
|
||||
} else if in_revents.contains(PollFlags::POLLHUP) {
|
||||
// We still have more data to write, but the process closed the pipe.
|
||||
return Err(Error::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"WAL redo process closed its stdin unexpectedly",
|
||||
));
|
||||
}
|
||||
// If we have some data in stdout, read it to the result buffer.
|
||||
let out_revents = self.poll_fds[0].revents().unwrap();
|
||||
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
|
||||
self.page_pos += self.stdout.read(&mut self.page[self.page_pos..])?;
|
||||
if self.page_pos == BLCKSZ as usize {
|
||||
if let Some(sender) = self.resp_deque.pop_front() {
|
||||
sender.send(Bytes::copy_from_slice(&self.page)).unwrap();
|
||||
} else {
|
||||
return Err(Error::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"Malformed output of walredo process",
|
||||
));
|
||||
}
|
||||
self.page_pos = 0;
|
||||
}
|
||||
} else if out_revents.contains(PollFlags::POLLHUP) {
|
||||
return Err(Error::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"WAL redo process closed its stdout unexpectedly",
|
||||
));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
//
|
||||
// Start postgres binary in special WAL redo mode.
|
||||
//
|
||||
#[instrument(skip_all,fields(tenant_id=%tenant_id, pg_version=pg_version))]
|
||||
#[instrument(skip_all,fields(tenant_id=%tenant_id))]
|
||||
fn launch(
|
||||
conf: &PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
@@ -605,7 +762,6 @@ impl PostgresRedoProcess {
|
||||
conf.tenant_path(&tenant_id).join("wal-redo-datadir"),
|
||||
TEMP_FILE_SUFFIX,
|
||||
);
|
||||
|
||||
// Create empty data directory for wal-redo postgres, deleting old one first.
|
||||
if datadir.exists() {
|
||||
info!(
|
||||
@@ -627,9 +783,14 @@ impl PostgresRedoProcess {
|
||||
)
|
||||
})?;
|
||||
|
||||
info!("running initdb in {}", datadir.display());
|
||||
info!(
|
||||
"running initdb in {} for pg_verson={}",
|
||||
datadir.display(),
|
||||
pg_version
|
||||
);
|
||||
let initdb = Command::new(pg_bin_dir_path.join("initdb"))
|
||||
.args(&["-D", &datadir.to_string_lossy()])
|
||||
.arg("-n")
|
||||
.arg("-N")
|
||||
.env_clear()
|
||||
.env("LD_LIBRARY_PATH", &pg_lib_dir_path)
|
||||
@@ -709,152 +870,25 @@ impl PostgresRedoProcess {
|
||||
// all fallible operations post-spawn are complete, so get rid of the guard
|
||||
let child = scopeguard::ScopeGuard::into_inner(child);
|
||||
|
||||
let poll_fds = [
|
||||
PollFd::new(stdout.as_raw_fd(), PollFlags::POLLIN),
|
||||
PollFd::new(stderr.as_raw_fd(), PollFlags::POLLIN),
|
||||
PollFd::new(stdin.as_raw_fd(), PollFlags::POLLOUT),
|
||||
];
|
||||
|
||||
Ok(PostgresRedoProcess {
|
||||
tenant_id,
|
||||
child,
|
||||
_child: child,
|
||||
stdin,
|
||||
stdout,
|
||||
stderr,
|
||||
wal_redo_timeout: conf.wal_redo_timeout,
|
||||
resp_deque: VecDeque::with_capacity(N_CHANNELS),
|
||||
page: [0u8; BLCKSZ as usize],
|
||||
page_pos: 0,
|
||||
poll_fds,
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%self.child.id()))]
|
||||
fn kill(self) {
|
||||
self.child.kill_and_wait();
|
||||
}
|
||||
|
||||
//
|
||||
// Apply given WAL records ('records') over an old page image. Returns
|
||||
// new page image.
|
||||
//
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%self.child.id()))]
|
||||
fn apply_wal_records(
|
||||
&mut self,
|
||||
tag: BufferTag,
|
||||
base_img: Option<Bytes>,
|
||||
records: &[(Lsn, NeonWalRecord)],
|
||||
wal_redo_timeout: Duration,
|
||||
) -> Result<Bytes, std::io::Error> {
|
||||
// Serialize all the messages to send the WAL redo process first.
|
||||
//
|
||||
// This could be problematic if there are millions of records to replay,
|
||||
// but in practice the number of records is usually so small that it doesn't
|
||||
// matter, and it's better to keep this code simple.
|
||||
//
|
||||
// Most requests start with a before-image with BLCKSZ bytes, followed by
|
||||
// by some other WAL records. Start with a buffer that can hold that
|
||||
// comfortably.
|
||||
let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
|
||||
build_begin_redo_for_block_msg(tag, &mut writebuf);
|
||||
if let Some(img) = base_img {
|
||||
build_push_page_msg(tag, &img, &mut writebuf);
|
||||
}
|
||||
for (lsn, rec) in records.iter() {
|
||||
if let NeonWalRecord::Postgres {
|
||||
will_init: _,
|
||||
rec: postgres_rec,
|
||||
} = rec
|
||||
{
|
||||
build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
|
||||
} else {
|
||||
return Err(Error::new(
|
||||
ErrorKind::Other,
|
||||
"tried to pass neon wal record to postgres WAL redo",
|
||||
));
|
||||
}
|
||||
}
|
||||
build_get_page_msg(tag, &mut writebuf);
|
||||
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
|
||||
|
||||
// The input is now in 'writebuf'. Do a blind write first, writing as much as
|
||||
// we can, before calling poll(). That skips one call to poll() if the stdin is
|
||||
// already available for writing, which it almost certainly is because the
|
||||
// process is idle.
|
||||
let mut nwrite = self.stdin.write(&writebuf)?;
|
||||
|
||||
// We expect the WAL redo process to respond with an 8k page image. We read it
|
||||
// into this buffer.
|
||||
let mut resultbuf = vec![0; BLCKSZ.into()];
|
||||
let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
|
||||
|
||||
// Prepare for calling poll()
|
||||
let mut pollfds = [
|
||||
PollFd::new(self.stdout.as_raw_fd(), PollFlags::POLLIN),
|
||||
PollFd::new(self.stderr.as_raw_fd(), PollFlags::POLLIN),
|
||||
PollFd::new(self.stdin.as_raw_fd(), PollFlags::POLLOUT),
|
||||
];
|
||||
|
||||
// We do three things simultaneously: send the old base image and WAL records to
|
||||
// the child process's stdin, read the result from child's stdout, and forward any logging
|
||||
// information that the child writes to its stderr to the page server's log.
|
||||
while nresult < BLCKSZ.into() {
|
||||
// If we have more data to write, wake up if 'stdin' becomes writeable or
|
||||
// we have data to read. Otherwise only wake up if there's data to read.
|
||||
let nfds = if nwrite < writebuf.len() { 3 } else { 2 };
|
||||
let n = loop {
|
||||
match nix::poll::poll(&mut pollfds[0..nfds], wal_redo_timeout.as_millis() as i32) {
|
||||
Err(e) if e == nix::errno::Errno::EINTR => continue,
|
||||
res => break res,
|
||||
}
|
||||
}?;
|
||||
|
||||
if n == 0 {
|
||||
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
|
||||
}
|
||||
|
||||
// If we have some messages in stderr, forward them to the log.
|
||||
let err_revents = pollfds[1].revents().unwrap();
|
||||
if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
|
||||
let mut errbuf: [u8; 16384] = [0; 16384];
|
||||
let n = self.stderr.read(&mut errbuf)?;
|
||||
|
||||
// The message might not be split correctly into lines here. But this is
|
||||
// good enough, the important thing is to get the message to the log.
|
||||
if n > 0 {
|
||||
error!(
|
||||
"wal-redo-postgres: {}",
|
||||
String::from_utf8_lossy(&errbuf[0..n])
|
||||
);
|
||||
|
||||
// To make sure we capture all log from the process if it fails, keep
|
||||
// reading from the stderr, before checking the stdout.
|
||||
continue;
|
||||
}
|
||||
} else if err_revents.contains(PollFlags::POLLHUP) {
|
||||
return Err(Error::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"WAL redo process closed its stderr unexpectedly",
|
||||
));
|
||||
}
|
||||
|
||||
// If we have more data to write and 'stdin' is writeable, do write.
|
||||
if nwrite < writebuf.len() {
|
||||
let in_revents = pollfds[2].revents().unwrap();
|
||||
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
|
||||
nwrite += self.stdin.write(&writebuf[nwrite..])?;
|
||||
} else if in_revents.contains(PollFlags::POLLHUP) {
|
||||
// We still have more data to write, but the process closed the pipe.
|
||||
return Err(Error::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"WAL redo process closed its stdin unexpectedly",
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// If we have some data in stdout, read it to the result buffer.
|
||||
let out_revents = pollfds[0].revents().unwrap();
|
||||
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
|
||||
nresult += self.stdout.read(&mut resultbuf[nresult..])?;
|
||||
} else if out_revents.contains(PollFlags::POLLHUP) {
|
||||
return Err(Error::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"WAL redo process closed its stdout unexpectedly",
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Bytes::from(resultbuf))
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrapper type around `std::process::Child` which guarantees that the child
|
||||
|
||||
Reference in New Issue
Block a user