Have a pool of WAL redo processes per tenant

To allow more concurrency, have a pool of WAL redo processes that can
grow up to 4 processes per tenant. There's no way to shrink the pool,
that's why I'm capping it at 4 processes, to keep the total number of
processes reasonable.
This commit is contained in:
Heikki Linnakangas
2022-04-22 00:32:47 +03:00
committed by Heikki Linnakangas
parent c1a76eb0e5
commit c82ab848de

View File

@@ -21,6 +21,7 @@
use byteorder::{ByteOrder, LittleEndian};
use bytes::{BufMut, Bytes, BytesMut};
use nix::poll::*;
use once_cell::sync::Lazy;
use serde::Serialize;
use std::fs;
use std::fs::OpenOptions;
@@ -31,7 +32,8 @@ use std::os::unix::prelude::CommandExt;
use std::path::PathBuf;
use std::process::Stdio;
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Condvar, Mutex};
use std::time::Duration;
use std::time::Instant;
use tracing::*;
@@ -55,6 +57,9 @@ use postgres_ffi::v14::nonrelfile_utils::{
};
use postgres_ffi::BLCKSZ;
/// Maximum number of WAL redo processes to launch for a single tenant.
const MAX_PROCESSES: usize = 4;
///
/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
///
@@ -88,18 +93,32 @@ pub trait WalRedoManager: Send + Sync {
) -> Result<Bytes, WalRedoError>;
}
static WAL_REDO_PROCESS_COUNTER: Lazy<AtomicU64> = Lazy::new(|| { AtomicU64::new(0) });
///
/// This is the real implementation that uses a Postgres process to
/// perform WAL replay. Only one thread can use the process at a time,
/// that is controlled by the Mutex. In the future, we might want to
/// launch a pool of processes to allow concurrent replay of multiple
/// records.
/// This is the real implementation that uses a special Postgres
/// process to perform WAL replay. There is a pool of these processes.
///
pub struct PostgresRedoManager {
tenant_id: TenantId,
conf: &'static PageServerConf,
process: Mutex<Option<PostgresRedoProcess>>,
/// Pool of processes.
process_list: Mutex<ProcessList>,
/// Condition variable that can be used to sleep until a process
/// becomes available in the pool.
condvar: Condvar,
}
// A pool of WAL redo processes
#[derive(Default)]
struct ProcessList {
/// processes that are available for reuse
free_processes: Vec<PostgresRedoProcess>,
/// Total number of processes, including all the processes in
/// 'free_processes' list, and any processes that are in use.
num_processes: usize,
}
/// Can this request be served by neon redo functions
@@ -204,7 +223,32 @@ impl PostgresRedoManager {
PostgresRedoManager {
tenant_id,
conf,
process: Mutex::new(None),
process_list: Mutex::new(ProcessList::default()),
condvar: Condvar::new(),
}
}
// Get a handle to a redo process from the pool.
fn get_process(&self, pg_version: u32) -> Result<PostgresRedoProcess, WalRedoError> {
let mut process_list = self.process_list.lock().unwrap();
loop {
// If there's a free process immediately available, take it.
if let Some(process) = process_list.free_processes.pop() {
return Ok(process);
}
// All processes are in use. If the pool is at its maximum size
// already, wait for a process to become free. Otherwise launch
// a new process.
if process_list.num_processes >= MAX_PROCESSES {
process_list = self.condvar.wait(process_list).unwrap();
continue;
} else {
let process = PostgresRedoProcess::launch(self.conf, &self.tenant_id, pg_version)?;
process_list.num_processes += 1;
return Ok(process);
}
}
}
@@ -224,15 +268,9 @@ impl PostgresRedoManager {
let start_time = Instant::now();
let mut process_guard = self.process.lock().unwrap();
let lock_time = Instant::now();
let mut process = self.get_process(pg_version)?;
// launch the WAL redo process on first use
if process_guard.is_none() {
let p = PostgresRedoProcess::launch(self.conf, &self.tenant_id, pg_version)?;
*process_guard = Some(p);
}
let process = process_guard.as_mut().unwrap();
let lock_time = Instant::now();
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
@@ -266,8 +304,9 @@ impl PostgresRedoManager {
lsn
);
// If something went wrong, don't try to reuse the process. Kill it, and
// next request will launch a new one.
// If something went wrong, don't try to reuse the
// process. Kill it, and next request will launch a new one.
// Otherwise return the process to the pool.
if result.is_err() {
error!(
"error applying {} WAL records ({} bytes) to reconstruct page image at LSN {}",
@@ -275,8 +314,14 @@ impl PostgresRedoManager {
nbytes,
lsn
);
let process = process_guard.take().unwrap();
process.kill();
let mut process_list = self.process_list.lock().unwrap();
process_list.num_processes -= 1;
self.condvar.notify_one();
} else {
let mut process_list = self.process_list.lock().unwrap();
process_list.free_processes.push(process);
self.condvar.notify_one();
}
result
}
@@ -594,11 +639,10 @@ impl PostgresRedoProcess {
tenant_id: &TenantId,
pg_version: u32,
) -> Result<PostgresRedoProcess, Error> {
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
// just create one with constant name. That fails if you try to launch more than
// one WAL redo manager concurrently.
// We need a dummy Postgres cluster to run the process in.
let processno = WAL_REDO_PROCESS_COUNTER.fetch_add(1, Ordering::Relaxed);
let datadir = path_with_suffix_extension(
conf.tenant_path(tenant_id).join("wal-redo-datadir"),
conf.tenant_path(tenant_id).join(format!("wal-redo-datadir-{}", processno)),
TEMP_FILE_SUFFIX,
);