From 3cd2a3f9317f54e75c00a813680060dcaa612a36 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 17 Sep 2024 20:16:33 +0100 Subject: [PATCH] refactor(walredo): process launch & kill-on-error machinery (#8951) Immediate benefit: easier to spot what's going on. Later benefit: use the extracted method in PR - https://github.com/neondatabase/neon/pull/8952 which adds a `ping` command to walredo. Found this useful during investigation https://github.com/neondatabase/cloud/issues/16886. --- pageserver/src/walredo.rs | 255 +++++++++++++++++++++----------------- 1 file changed, 140 insertions(+), 115 deletions(-) diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index a36955fa21..0fe7def8b0 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -35,6 +35,7 @@ use anyhow::Context; use bytes::{Bytes, BytesMut}; use pageserver_api::models::{WalRedoManagerProcessStatus, WalRedoManagerStatus}; use pageserver_api::shard::TenantShardId; +use std::future::Future; use std::sync::Arc; use std::time::Duration; use std::time::Instant; @@ -296,6 +297,97 @@ impl PostgresRedoManager { } } + async fn do_with_walredo_process< + F: FnOnce(Arc) -> Fut, + Fut: Future>, + O, + >( + &self, + pg_version: u32, + closure: F, + ) -> Result { + let proc: Arc = match self.redo_process.get_or_init_detached().await { + Ok(guard) => match &*guard { + ProcessOnceCell::Spawned(proc) => Arc::clone(proc), + ProcessOnceCell::ManagerShutDown => { + return Err(Error::Cancelled); + } + }, + Err(permit) => { + let start = Instant::now(); + // acquire guard before spawning process, so that we don't spawn new processes + // if the gate is already closed. + let _launched_processes_guard = match self.launched_processes.enter() { + Ok(guard) => guard, + Err(GateError::GateClosed) => unreachable!( + "shutdown sets the once cell to `ManagerShutDown` state before closing the gate" + ), + }; + let proc = Arc::new(Process { + process: process::WalRedoProcess::launch( + self.conf, + self.tenant_shard_id, + pg_version, + ) + .context("launch walredo process")?, + _launched_processes_guard, + }); + let duration = start.elapsed(); + WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM.observe(duration.as_secs_f64()); + info!( + elapsed_ms = duration.as_millis(), + pid = proc.id(), + "launched walredo process" + ); + self.redo_process + .set(ProcessOnceCell::Spawned(Arc::clone(&proc)), permit); + proc + } + }; + + // async closures are unstable, would support &Process + let result = closure(proc.clone()).await; + + if result.is_err() { + // Avoid concurrent callers hitting the same issue by taking `proc` out of the rotation. + // Note that there may be other tasks concurrent with us that also hold `proc`. + // We have to deal with that here. + // Also read the doc comment on field `self.redo_process`. + // + // NB: there may still be other concurrent threads using `proc`. + // The last one will send SIGKILL when the underlying Arc reaches refcount 0. + // + // NB: the drop impl blocks the dropping thread with a wait() system call for + // the child process. In some ways the blocking is actually good: if we + // deferred the waiting into the background / to tokio if we used `tokio::process`, + // it could happen that if walredo always fails immediately, we spawn processes faster + // than we can SIGKILL & `wait` for them to exit. By doing it the way we do here, + // we limit this risk of run-away to at most $num_runtimes * $num_executor_threads. + // This probably needs revisiting at some later point. + match self.redo_process.get() { + None => (), + Some(guard) => { + match &*guard { + ProcessOnceCell::ManagerShutDown => {} + ProcessOnceCell::Spawned(guard_proc) => { + if Arc::ptr_eq(&proc, guard_proc) { + // We're the first to observe an error from `proc`, it's our job to take it out of rotation. + guard.take_and_deinit(); + } else { + // Another task already spawned another redo process (further up in this method) + // and put it into `redo_process`. Do nothing, our view of the world is behind. + } + } + } + } + } + // The last task that does this `drop()` of `proc` will do a blocking `wait()` syscall. + drop(proc); + } + + result + } + /// /// Process one request for WAL redo using wal-redo postgres /// @@ -319,130 +411,63 @@ impl PostgresRedoManager { const MAX_RETRY_ATTEMPTS: u32 = 1; let mut n_attempts = 0u32; loop { - let proc: Arc = match self.redo_process.get_or_init_detached().await { - Ok(guard) => match &*guard { - ProcessOnceCell::Spawned(proc) => Arc::clone(proc), - ProcessOnceCell::ManagerShutDown => { - return Err(Error::Cancelled); - } - }, - Err(permit) => { - let start = Instant::now(); - // acquire guard before spawning process, so that we don't spawn new processes - // if the gate is already closed. - let _launched_processes_guard = match self.launched_processes.enter() { - Ok(guard) => guard, - Err(GateError::GateClosed) => unreachable!( - "shutdown sets the once cell to `ManagerShutDown` state before closing the gate" - ), - }; - let proc = Arc::new(Process { - process: process::WalRedoProcess::launch( - self.conf, - self.tenant_shard_id, - pg_version, - ) - .context("launch walredo process")?, - _launched_processes_guard, - }); - let duration = start.elapsed(); - WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM.observe(duration.as_secs_f64()); - info!( - duration_ms = duration.as_millis(), - pid = proc.id(), - "launched walredo process" - ); - self.redo_process - .set(ProcessOnceCell::Spawned(Arc::clone(&proc)), permit); - proc - } - }; + let base_img = &base_img; + let closure = |proc: Arc| async move { + let started_at = std::time::Instant::now(); - let started_at = std::time::Instant::now(); + // Relational WAL records are applied using wal-redo-postgres + let result = proc + .apply_wal_records(rel, blknum, base_img, records, wal_redo_timeout) + .await + .context("apply_wal_records"); - // Relational WAL records are applied using wal-redo-postgres - let result = proc - .apply_wal_records(rel, blknum, &base_img, records, wal_redo_timeout) - .await - .context("apply_wal_records"); + let duration = started_at.elapsed(); - let duration = started_at.elapsed(); - - let len = records.len(); - let nbytes = records.iter().fold(0, |acumulator, record| { - acumulator - + match &record.1 { - NeonWalRecord::Postgres { rec, .. } => rec.len(), - _ => unreachable!("Only PostgreSQL records are accepted in this batch"), - } - }); - - WAL_REDO_TIME.observe(duration.as_secs_f64()); - WAL_REDO_RECORDS_HISTOGRAM.observe(len as f64); - WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64); - - debug!( - "postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}", - len, - nbytes, - duration.as_micros(), - lsn - ); - - // If something went wrong, don't try to reuse the process. Kill it, and - // next request will launch a new one. - if let Err(e) = result.as_ref() { - error!( - "error applying {} WAL records {}..{} ({} bytes) to key {key}, from base image with LSN {} to reconstruct page image at LSN {} n_attempts={}: {:?}", - records.len(), - records.first().map(|p| p.0).unwrap_or(Lsn(0)), - records.last().map(|p| p.0).unwrap_or(Lsn(0)), - nbytes, - base_img_lsn, - lsn, - n_attempts, - e, - ); - // Avoid concurrent callers hitting the same issue by taking `proc` out of the rotation. - // Note that there may be other tasks concurrent with us that also hold `proc`. - // We have to deal with that here. - // Also read the doc comment on field `self.redo_process`. - // - // NB: there may still be other concurrent threads using `proc`. - // The last one will send SIGKILL when the underlying Arc reaches refcount 0. - // - // NB: the drop impl blocks the dropping thread with a wait() system call for - // the child process. In some ways the blocking is actually good: if we - // deferred the waiting into the background / to tokio if we used `tokio::process`, - // it could happen that if walredo always fails immediately, we spawn processes faster - // than we can SIGKILL & `wait` for them to exit. By doing it the way we do here, - // we limit this risk of run-away to at most $num_runtimes * $num_executor_threads. - // This probably needs revisiting at some later point. - match self.redo_process.get() { - None => (), - Some(guard) => { - match &*guard { - ProcessOnceCell::ManagerShutDown => {} - ProcessOnceCell::Spawned(guard_proc) => { - if Arc::ptr_eq(&proc, guard_proc) { - // We're the first to observe an error from `proc`, it's our job to take it out of rotation. - guard.take_and_deinit(); - } else { - // Another task already spawned another redo process (further up in this method) - // and put it into `redo_process`. Do nothing, our view of the world is behind. - } - } + let len = records.len(); + let nbytes = records.iter().fold(0, |acumulator, record| { + acumulator + + match &record.1 { + NeonWalRecord::Postgres { rec, .. } => rec.len(), + _ => unreachable!("Only PostgreSQL records are accepted in this batch"), } - } + }); + + WAL_REDO_TIME.observe(duration.as_secs_f64()); + WAL_REDO_RECORDS_HISTOGRAM.observe(len as f64); + WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64); + + debug!( + "postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}", + len, + nbytes, + duration.as_micros(), + lsn + ); + + if let Err(e) = result.as_ref() { + error!( + "error applying {} WAL records {}..{} ({} bytes) to key {key}, from base image with LSN {} to reconstruct page image at LSN {} n_attempts={}: {:?}", + records.len(), + records.first().map(|p| p.0).unwrap_or(Lsn(0)), + records.last().map(|p| p.0).unwrap_or(Lsn(0)), + nbytes, + base_img_lsn, + lsn, + n_attempts, + e, + ); } - // The last task that does this `drop()` of `proc` will do a blocking `wait()` syscall. - drop(proc); - } else if n_attempts != 0 { + + result.map_err(Error::Other) + }; + let result = self.do_with_walredo_process(pg_version, closure).await; + + if result.is_ok() && n_attempts != 0 { info!(n_attempts, "retried walredo succeeded"); } n_attempts += 1; if n_attempts > MAX_RETRY_ATTEMPTS || result.is_ok() { - return result.map_err(Error::Other); + return result; } } }