diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index e21ec4d742..da25ba539d 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -21,6 +21,7 @@ use byteorder::{ByteOrder, LittleEndian}; use bytes::{BufMut, Bytes, BytesMut}; use nix::poll::*; +use once_cell::sync::Lazy; use serde::Serialize; use std::fs; use std::fs::OpenOptions; @@ -31,7 +32,8 @@ use std::os::unix::prelude::CommandExt; use std::path::PathBuf; use std::process::Stdio; use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command}; -use std::sync::Mutex; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Condvar, Mutex}; use std::time::Duration; use std::time::Instant; use tracing::*; @@ -55,6 +57,9 @@ use postgres_ffi::v14::nonrelfile_utils::{ }; use postgres_ffi::BLCKSZ; +/// Maximum number of WAL redo processes to launch for a single tenant. +const MAX_PROCESSES: usize = 4; + /// /// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster. /// @@ -88,18 +93,32 @@ pub trait WalRedoManager: Send + Sync { ) -> Result; } +static WAL_REDO_PROCESS_COUNTER: Lazy = Lazy::new(|| { AtomicU64::new(0) }); + /// -/// This is the real implementation that uses a Postgres process to -/// perform WAL replay. Only one thread can use the process at a time, -/// that is controlled by the Mutex. In the future, we might want to -/// launch a pool of processes to allow concurrent replay of multiple -/// records. +/// This is the real implementation that uses a special Postgres +/// process to perform WAL replay. There is a pool of these processes. /// pub struct PostgresRedoManager { tenant_id: TenantId, conf: &'static PageServerConf, - process: Mutex>, + /// Pool of processes. + process_list: Mutex, + /// Condition variable that can be used to sleep until a process + /// becomes available in the pool. + condvar: Condvar, +} + +// A pool of WAL redo processes +#[derive(Default)] +struct ProcessList { + /// processes that are available for reuse + free_processes: Vec, + + /// Total number of processes, including all the processes in + /// 'free_processes' list, and any processes that are in use. + num_processes: usize, } /// Can this request be served by neon redo functions @@ -204,7 +223,32 @@ impl PostgresRedoManager { PostgresRedoManager { tenant_id, conf, - process: Mutex::new(None), + process_list: Mutex::new(ProcessList::default()), + condvar: Condvar::new(), + } + } + + // Get a handle to a redo process from the pool. + fn get_process(&self, pg_version: u32) -> Result { + let mut process_list = self.process_list.lock().unwrap(); + + loop { + // If there's a free process immediately available, take it. + if let Some(process) = process_list.free_processes.pop() { + return Ok(process); + } + + // All processes are in use. If the pool is at its maximum size + // already, wait for a process to become free. Otherwise launch + // a new process. + if process_list.num_processes >= MAX_PROCESSES { + process_list = self.condvar.wait(process_list).unwrap(); + continue; + } else { + let process = PostgresRedoProcess::launch(self.conf, &self.tenant_id, pg_version)?; + process_list.num_processes += 1; + return Ok(process); + } } } @@ -224,15 +268,9 @@ impl PostgresRedoManager { let start_time = Instant::now(); - let mut process_guard = self.process.lock().unwrap(); - let lock_time = Instant::now(); + let mut process = self.get_process(pg_version)?; - // launch the WAL redo process on first use - if process_guard.is_none() { - let p = PostgresRedoProcess::launch(self.conf, &self.tenant_id, pg_version)?; - *process_guard = Some(p); - } - let process = process_guard.as_mut().unwrap(); + let lock_time = Instant::now(); WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64()); @@ -266,8 +304,9 @@ impl PostgresRedoManager { lsn ); - // If something went wrong, don't try to reuse the process. Kill it, and - // next request will launch a new one. + // If something went wrong, don't try to reuse the + // process. Kill it, and next request will launch a new one. + // Otherwise return the process to the pool. if result.is_err() { error!( "error applying {} WAL records ({} bytes) to reconstruct page image at LSN {}", @@ -275,8 +314,14 @@ impl PostgresRedoManager { nbytes, lsn ); - let process = process_guard.take().unwrap(); process.kill(); + let mut process_list = self.process_list.lock().unwrap(); + process_list.num_processes -= 1; + self.condvar.notify_one(); + } else { + let mut process_list = self.process_list.lock().unwrap(); + process_list.free_processes.push(process); + self.condvar.notify_one(); } result } @@ -594,11 +639,10 @@ impl PostgresRedoProcess { tenant_id: &TenantId, pg_version: u32, ) -> Result { - // FIXME: We need a dummy Postgres cluster to run the process in. Currently, we - // just create one with constant name. That fails if you try to launch more than - // one WAL redo manager concurrently. + // We need a dummy Postgres cluster to run the process in. + let processno = WAL_REDO_PROCESS_COUNTER.fetch_add(1, Ordering::Relaxed); let datadir = path_with_suffix_extension( - conf.tenant_path(tenant_id).join("wal-redo-datadir"), + conf.tenant_path(tenant_id).join(format!("wal-redo-datadir-{}", processno)), TEMP_FILE_SUFFIX, );