mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 00:42:54 +00:00
Have a pool of WAL redo processes per tenant
To allow more concurrency, have a pool of WAL redo processes that can grow up to 4 processes per tenant. There's no way to shrink the pool, that's why I'm capping it at 4 processes, to keep the total number of processes reasonable.
This commit is contained in:
committed by
Heikki Linnakangas
parent
5bd843551c
commit
a726b37bca
@@ -21,6 +21,7 @@
|
||||
use byteorder::{ByteOrder, LittleEndian};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use nix::poll::*;
|
||||
use once_cell::sync::Lazy;
|
||||
use serde::Serialize;
|
||||
use std::fs;
|
||||
use std::fs::OpenOptions;
|
||||
@@ -31,7 +32,8 @@ use std::os::unix::prelude::CommandExt;
|
||||
use std::path::PathBuf;
|
||||
use std::process::Stdio;
|
||||
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
|
||||
use std::sync::Mutex;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::{Condvar, Mutex};
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use tracing::*;
|
||||
@@ -55,6 +57,9 @@ use postgres_ffi::v14::nonrelfile_utils::{
|
||||
};
|
||||
use postgres_ffi::BLCKSZ;
|
||||
|
||||
/// Maximum number of WAL redo processes to launch for a single tenant.
|
||||
const MAX_PROCESSES: usize = 4;
|
||||
|
||||
///
|
||||
/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
|
||||
///
|
||||
@@ -88,18 +93,32 @@ pub trait WalRedoManager: Send + Sync {
|
||||
) -> Result<Bytes, WalRedoError>;
|
||||
}
|
||||
|
||||
static WAL_REDO_PROCESS_COUNTER: Lazy<AtomicU64> = Lazy::new(|| { AtomicU64::new(0) });
|
||||
|
||||
///
|
||||
/// This is the real implementation that uses a Postgres process to
|
||||
/// perform WAL replay. Only one thread can use the process at a time,
|
||||
/// that is controlled by the Mutex. In the future, we might want to
|
||||
/// launch a pool of processes to allow concurrent replay of multiple
|
||||
/// records.
|
||||
/// This is the real implementation that uses a special Postgres
|
||||
/// process to perform WAL replay. There is a pool of these processes.
|
||||
///
|
||||
pub struct PostgresRedoManager {
|
||||
tenant_id: TenantId,
|
||||
conf: &'static PageServerConf,
|
||||
|
||||
process: Mutex<Option<PostgresRedoProcess>>,
|
||||
/// Pool of processes.
|
||||
process_list: Mutex<ProcessList>,
|
||||
/// Condition variable that can be used to sleep until a process
|
||||
/// becomes available in the pool.
|
||||
condvar: Condvar,
|
||||
}
|
||||
|
||||
// A pool of WAL redo processes
|
||||
#[derive(Default)]
|
||||
struct ProcessList {
|
||||
/// processes that are available for reuse
|
||||
free_processes: Vec<PostgresRedoProcess>,
|
||||
|
||||
/// Total number of processes, including all the processes in
|
||||
/// 'free_processes' list, and any processes that are in use.
|
||||
num_processes: usize,
|
||||
}
|
||||
|
||||
/// Can this request be served by neon redo functions
|
||||
@@ -204,7 +223,32 @@ impl PostgresRedoManager {
|
||||
PostgresRedoManager {
|
||||
tenant_id,
|
||||
conf,
|
||||
process: Mutex::new(None),
|
||||
process_list: Mutex::new(ProcessList::default()),
|
||||
condvar: Condvar::new(),
|
||||
}
|
||||
}
|
||||
|
||||
// Get a handle to a redo process from the pool.
|
||||
fn get_process(&self, pg_version: u32) -> Result<PostgresRedoProcess, WalRedoError> {
|
||||
let mut process_list = self.process_list.lock().unwrap();
|
||||
|
||||
loop {
|
||||
// If there's a free process immediately available, take it.
|
||||
if let Some(process) = process_list.free_processes.pop() {
|
||||
return Ok(process);
|
||||
}
|
||||
|
||||
// All processes are in use. If the pool is at its maximum size
|
||||
// already, wait for a process to become free. Otherwise launch
|
||||
// a new process.
|
||||
if process_list.num_processes >= MAX_PROCESSES {
|
||||
process_list = self.condvar.wait(process_list).unwrap();
|
||||
continue;
|
||||
} else {
|
||||
let process = PostgresRedoProcess::launch(self.conf, &self.tenant_id, pg_version)?;
|
||||
process_list.num_processes += 1;
|
||||
return Ok(process);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -224,15 +268,9 @@ impl PostgresRedoManager {
|
||||
|
||||
let start_time = Instant::now();
|
||||
|
||||
let mut process_guard = self.process.lock().unwrap();
|
||||
let lock_time = Instant::now();
|
||||
let mut process = self.get_process(pg_version)?;
|
||||
|
||||
// launch the WAL redo process on first use
|
||||
if process_guard.is_none() {
|
||||
let p = PostgresRedoProcess::launch(self.conf, &self.tenant_id, pg_version)?;
|
||||
*process_guard = Some(p);
|
||||
}
|
||||
let process = process_guard.as_mut().unwrap();
|
||||
let lock_time = Instant::now();
|
||||
|
||||
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
|
||||
|
||||
@@ -266,8 +304,9 @@ impl PostgresRedoManager {
|
||||
lsn
|
||||
);
|
||||
|
||||
// If something went wrong, don't try to reuse the process. Kill it, and
|
||||
// next request will launch a new one.
|
||||
// If something went wrong, don't try to reuse the
|
||||
// process. Kill it, and next request will launch a new one.
|
||||
// Otherwise return the process to the pool.
|
||||
if result.is_err() {
|
||||
error!(
|
||||
"error applying {} WAL records ({} bytes) to reconstruct page image at LSN {}",
|
||||
@@ -275,8 +314,14 @@ impl PostgresRedoManager {
|
||||
nbytes,
|
||||
lsn
|
||||
);
|
||||
let process = process_guard.take().unwrap();
|
||||
process.kill();
|
||||
let mut process_list = self.process_list.lock().unwrap();
|
||||
process_list.num_processes -= 1;
|
||||
self.condvar.notify_one();
|
||||
} else {
|
||||
let mut process_list = self.process_list.lock().unwrap();
|
||||
process_list.free_processes.push(process);
|
||||
self.condvar.notify_one();
|
||||
}
|
||||
result
|
||||
}
|
||||
@@ -594,11 +639,10 @@ impl PostgresRedoProcess {
|
||||
tenant_id: &TenantId,
|
||||
pg_version: u32,
|
||||
) -> Result<PostgresRedoProcess, Error> {
|
||||
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
|
||||
// just create one with constant name. That fails if you try to launch more than
|
||||
// one WAL redo manager concurrently.
|
||||
// We need a dummy Postgres cluster to run the process in.
|
||||
let processno = WAL_REDO_PROCESS_COUNTER.fetch_add(1, Ordering::Relaxed);
|
||||
let datadir = path_with_suffix_extension(
|
||||
conf.tenant_path(tenant_id).join("wal-redo-datadir"),
|
||||
conf.tenant_path(tenant_id).join(format!("wal-redo-datadir-{}", processno)),
|
||||
TEMP_FILE_SUFFIX,
|
||||
);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user