mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 08:52:56 +00:00
Launch multiple wal-redo postgres instances
This commit is contained in:
@@ -32,6 +32,7 @@ use std::os::unix::io::AsRawFd;
|
||||
use std::path::PathBuf;
|
||||
use std::process::Stdio;
|
||||
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
@@ -53,6 +54,8 @@ use postgres_ffi::nonrelfile_utils::transaction_id_set_status;
|
||||
use postgres_ffi::pg_constants;
|
||||
use postgres_ffi::XLogRecord;
|
||||
|
||||
const N_WAL_REDO_PROCS: usize = 1;
|
||||
|
||||
///
|
||||
/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
|
||||
///
|
||||
@@ -139,7 +142,8 @@ pub struct PostgresRedoManager {
|
||||
tenantid: ZTenantId,
|
||||
conf: &'static PageServerConf,
|
||||
|
||||
process: Mutex<Option<PostgresRedoProcess>>,
|
||||
round_robin: AtomicUsize,
|
||||
processes: [Mutex<Option<PostgresRedoProcess>>; N_WAL_REDO_PROCS],
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -209,12 +213,13 @@ impl WalRedoManager for PostgresRedoManager {
|
||||
end_time = Instant::now();
|
||||
WAL_REDO_TIME.observe(end_time.duration_since(start_time).as_secs_f64());
|
||||
} else {
|
||||
let mut process_guard = self.process.lock().unwrap();
|
||||
let rr = self.round_robin.fetch_add(1, Ordering::Relaxed) % N_WAL_REDO_PROCS;
|
||||
let mut process_guard = self.processes[rr].lock().unwrap();
|
||||
let lock_time = Instant::now();
|
||||
|
||||
// launch the WAL redo process on first use
|
||||
if process_guard.is_none() {
|
||||
let p = PostgresRedoProcess::launch(self.conf, &self.tenantid)?;
|
||||
let p = PostgresRedoProcess::launch(self.conf, &self.tenantid, rr)?;
|
||||
*process_guard = Some(p);
|
||||
}
|
||||
let process = process_guard.as_mut().unwrap();
|
||||
@@ -246,7 +251,8 @@ impl PostgresRedoManager {
|
||||
PostgresRedoManager {
|
||||
tenantid,
|
||||
conf,
|
||||
process: Mutex::new(None),
|
||||
round_robin: AtomicUsize::new(0),
|
||||
processes: [(); N_WAL_REDO_PROCS].map(|_| Mutex::new(None)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -472,11 +478,17 @@ impl PostgresRedoProcess {
|
||||
//
|
||||
// Start postgres binary in special WAL redo mode.
|
||||
//
|
||||
fn launch(conf: &PageServerConf, tenantid: &ZTenantId) -> Result<PostgresRedoProcess, Error> {
|
||||
fn launch(
|
||||
conf: &PageServerConf,
|
||||
tenantid: &ZTenantId,
|
||||
id: usize,
|
||||
) -> Result<PostgresRedoProcess, Error> {
|
||||
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
|
||||
// just create one with constant name. That fails if you try to launch more than
|
||||
// one WAL redo manager concurrently.
|
||||
let datadir = conf.tenant_path(tenantid).join("wal-redo-datadir");
|
||||
let datadir = conf
|
||||
.tenant_path(tenantid)
|
||||
.join(format! {"wal-redo-datadir-{}", id});
|
||||
|
||||
// Create empty data directory for wal-redo postgres, deleting old one first.
|
||||
if datadir.exists() {
|
||||
|
||||
Reference in New Issue
Block a user