diff --git a/Cargo.toml b/Cargo.toml index 0057a617f6..3c6077648e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -170,7 +170,6 @@ tokio-postgres-rustls = "0.11.0" tokio-rustls = "0.25" tokio-stream = "0.1" tokio-tar = "0.3" -tokio-test = "0.4.3" tokio-util = { version = "0.7.10", features = ["io", "rt"] } toml = "0.7" toml_edit = "0.19" diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 658e0ddbff..1837da34ce 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -58,6 +58,7 @@ pub mod defaults { pub use storage_broker::DEFAULT_ENDPOINT as BROKER_DEFAULT_ENDPOINT; pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s"; + pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s"; pub const DEFAULT_SUPERUSER: &str = "cloud_admin"; @@ -106,6 +107,7 @@ pub mod defaults { #listen_http_addr = '{DEFAULT_HTTP_LISTEN_ADDR}' #wait_lsn_timeout = '{DEFAULT_WAIT_LSN_TIMEOUT}' +#wal_redo_timeout = '{DEFAULT_WAL_REDO_TIMEOUT}' #page_cache_size = {DEFAULT_PAGE_CACHE_SIZE} #max_file_descriptors = {DEFAULT_MAX_FILE_DESCRIPTORS} @@ -180,6 +182,8 @@ pub struct PageServerConf { // Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call. pub wait_lsn_timeout: Duration, + // How long to wait for WAL redo to complete. + pub wal_redo_timeout: Duration, pub superuser: String, @@ -349,6 +353,7 @@ struct PageServerConfigBuilder { availability_zone: BuilderValue>, wait_lsn_timeout: BuilderValue, + wal_redo_timeout: BuilderValue, superuser: BuilderValue, @@ -421,6 +426,8 @@ impl PageServerConfigBuilder { availability_zone: Set(None), wait_lsn_timeout: Set(humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT) .expect("cannot parse default wait lsn timeout")), + wal_redo_timeout: Set(humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT) + .expect("cannot parse default wal redo timeout")), superuser: Set(DEFAULT_SUPERUSER.to_string()), page_cache_size: Set(DEFAULT_PAGE_CACHE_SIZE), max_file_descriptors: Set(DEFAULT_MAX_FILE_DESCRIPTORS), @@ -514,6 +521,10 @@ impl PageServerConfigBuilder { self.wait_lsn_timeout = BuilderValue::Set(wait_lsn_timeout) } + pub fn wal_redo_timeout(&mut self, wal_redo_timeout: Duration) { + self.wal_redo_timeout = BuilderValue::Set(wal_redo_timeout) + } + pub fn superuser(&mut self, superuser: String) { self.superuser = BuilderValue::Set(superuser) } @@ -695,6 +706,7 @@ impl PageServerConfigBuilder { listen_http_addr, availability_zone, wait_lsn_timeout, + wal_redo_timeout, superuser, page_cache_size, max_file_descriptors, @@ -919,6 +931,7 @@ impl PageServerConf { "listen_http_addr" => builder.listen_http_addr(parse_toml_string(key, item)?), "availability_zone" => builder.availability_zone(Some(parse_toml_string(key, item)?)), "wait_lsn_timeout" => builder.wait_lsn_timeout(parse_toml_duration(key, item)?), + "wal_redo_timeout" => builder.wal_redo_timeout(parse_toml_duration(key, item)?), "initial_superuser_name" => builder.superuser(parse_toml_string(key, item)?), "page_cache_size" => builder.page_cache_size(parse_toml_u64(key, item)? as usize), "max_file_descriptors" => { @@ -1054,6 +1067,7 @@ impl PageServerConf { PageServerConf { id: NodeId(0), wait_lsn_timeout: Duration::from_secs(60), + wal_redo_timeout: Duration::from_secs(60), page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE, max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS, listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(), @@ -1244,6 +1258,7 @@ listen_pg_addr = '127.0.0.1:64000' listen_http_addr = '127.0.0.1:9898' wait_lsn_timeout = '111 s' +wal_redo_timeout = '111 s' page_cache_size = 444 max_file_descriptors = 333 @@ -1284,6 +1299,7 @@ background_task_maximum_delay = '334 s' listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(), availability_zone: None, wait_lsn_timeout: humantime::parse_duration(defaults::DEFAULT_WAIT_LSN_TIMEOUT)?, + wal_redo_timeout: humantime::parse_duration(defaults::DEFAULT_WAL_REDO_TIMEOUT)?, superuser: defaults::DEFAULT_SUPERUSER.to_string(), page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE, max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS, @@ -1335,7 +1351,7 @@ background_task_maximum_delay = '334 s' .expect("Invalid default constant") ), validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET, - ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB, + ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB }, "Correct defaults should be used when no config values are provided" ); @@ -1365,6 +1381,7 @@ background_task_maximum_delay = '334 s' listen_http_addr: "127.0.0.1:9898".to_string(), availability_zone: None, wait_lsn_timeout: Duration::from_secs(111), + wal_redo_timeout: Duration::from_secs(111), superuser: "zzzz".to_string(), page_cache_size: 444, max_file_descriptors: 333, @@ -1406,7 +1423,7 @@ background_task_maximum_delay = '334 s' .expect("Invalid default constant") ), validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET, - ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB, + ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB }, "Should be able to parse all basic config values correctly" ); diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index e0e7dc029e..63a4aabc35 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -111,6 +111,7 @@ impl PostgresRedoManager { img, base_img_lsn, &records[batch_start..i], + self.conf.wal_redo_timeout, pg_version, ) .await @@ -131,6 +132,7 @@ impl PostgresRedoManager { img, base_img_lsn, &records[batch_start..], + self.conf.wal_redo_timeout, pg_version, ) .await @@ -197,6 +199,7 @@ impl PostgresRedoManager { base_img: Option, base_img_lsn: Lsn, records: &[(Lsn, NeonWalRecord)], + wal_redo_timeout: Duration, pg_version: u32, ) -> anyhow::Result { *(self.last_redo_at.lock().unwrap()) = Some(Instant::now()); @@ -235,7 +238,7 @@ impl PostgresRedoManager { // Relational WAL records are applied using wal-redo-postgres let result = proc - .apply_wal_records(rel, blknum, &base_img, records) + .apply_wal_records(rel, blknum, &base_img, records, wal_redo_timeout) .await .context("apply_wal_records"); diff --git a/pageserver/src/walredo/process.rs b/pageserver/src/walredo/process.rs index 1420685c4d..978edc1e4f 100644 --- a/pageserver/src/walredo/process.rs +++ b/pageserver/src/walredo/process.rs @@ -13,6 +13,7 @@ use std::sync::atomic::AtomicUsize; use std::{ collections::VecDeque, process::{Command, Stdio}, + time::Duration, }; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tracing::{debug, error, instrument, Instrument}; @@ -184,6 +185,7 @@ impl WalRedoProcess { blknum: u32, base_img: &Option, records: &[(Lsn, NeonWalRecord)], + wal_redo_timeout: Duration, ) -> anyhow::Result { let tag = protocol::BufferTag { rel, blknum }; @@ -215,7 +217,11 @@ impl WalRedoProcess { protocol::build_get_page_msg(tag, &mut writebuf); WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64); - let res = self.apply_wal_records0(&writebuf).await; + let Ok(res) = + tokio::time::timeout(wal_redo_timeout, self.apply_wal_records0(&writebuf)).await + else { + anyhow::bail!("WAL redo timed out"); + }; if res.is_err() { // not all of these can be caused by this particular input, however these are so rare @@ -228,7 +234,10 @@ impl WalRedoProcess { /// # Cancel-Safety /// - /// Cancellation safe (enforced through the use of [`utils::poison::Poison`]). + /// When not polled to completion (e.g. because in `tokio::select!` another + /// branch becomes ready before this future), concurrent and subsequent + /// calls may fail due to [`utils::poison::Poison::check_and_arm`] calls. + /// Dispose of this process instance and create a new one. async fn apply_wal_records0(&self, writebuf: &[u8]) -> anyhow::Result { let request_no = { let mut lock_guard = self.stdin.lock().await;