Compare commits

...

8 Commits

Author SHA1 Message Date
Christian Schwarz
c8a3b6f930 the fix: shutdown method for WalRedoManager, so the Arc<> can outlive the process 2024-06-24 19:29:31 +00:00
Christian Schwarz
6779c908eb Revert "WIP: solution approach 1: propagate cancellationtoken from tenant"
This reverts commit 5202d2dc98.
2024-06-24 19:04:02 +00:00
Christian Schwarz
5202d2dc98 WIP: solution approach 1: propagate cancellationtoken from tenant
not enough, we need its gate, and there's no concept of child gate
2024-06-24 19:03:30 +00:00
Christian Schwarz
d7e0c99616 trim down the PR to just keeping track of walredo processes 2024-06-24 18:46:29 +00:00
Christian Schwarz
7507d137de WIP 2024-06-24 18:25:57 +00:00
Christian Schwarz
34f42669fa WIP 2024-06-24 17:56:36 +00:00
Christian Schwarz
943220df9b implement a global gate + cancellation mechanism for live walredo processes, hook up to shutdown 2024-06-24 17:18:54 +00:00
Christian Schwarz
5e0ef715aa add distinguished "Cancelled" error for walredo (don't use it yet) 2024-06-24 16:05:04 +00:00
6 changed files with 211 additions and 46 deletions

View File

@@ -394,6 +394,10 @@ fn start_pageserver(
deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle());
}
// Set up global state shared by all walredo processes.
let walredo_global_state =
BACKGROUND_RUNTIME.block_on(pageserver::walredo::GlobalState::spawn(conf));
// Up to this point no significant I/O has been done: this should have been fast. Record
// duration prior to starting I/O intensive phase of startup.
startup_checkpoint(started_startup_at, "initial", "Starting loading tenants");
@@ -429,6 +433,7 @@ fn start_pageserver(
broker_client: broker_client.clone(),
remote_storage: remote_storage.clone(),
deletion_queue_client,
walredo_global_state: Arc::clone(&walredo_global_state),
},
order,
shutdown_pageserver.clone(),
@@ -689,7 +694,13 @@ fn start_pageserver(
// Right now that tree doesn't reach very far, and `task_mgr` is used instead.
// The plan is to change that over time.
shutdown_pageserver.take();
pageserver::shutdown_pageserver(&tenant_manager, deletion_queue.clone(), 0).await;
pageserver::shutdown_pageserver(
&tenant_manager,
deletion_queue.clone(),
walredo_global_state,
0,
)
.await;
unreachable!()
})
}

View File

@@ -11,6 +11,8 @@ pub mod deletion_queue;
pub mod disk_usage_eviction_task;
pub mod http;
pub mod import_datadir;
use std::sync::Arc;
pub use pageserver_api::keyspace;
pub mod aux_file;
pub mod metrics;
@@ -58,6 +60,7 @@ pub use crate::metrics::preinitialize_metrics;
pub async fn shutdown_pageserver(
tenant_manager: &TenantManager,
mut deletion_queue: DeletionQueue,
walredo_global_state: Arc<walredo::GlobalState>,
exit_code: i32,
) {
use std::time::Duration;
@@ -79,6 +82,18 @@ pub async fn shutdown_pageserver(
)
.await;
// walredo processes are tenant-scoped and should have been shut down after tenant manager shutdown above.
//
// In practive, we have lingering walredo processes even when pageserver shuts down cleanly, i.e., even when it
// does not hit systemd's TimeoutSec timeout (10 seconds in prod).
// TODO: understand why the processes aren't gone by the time tenant_manager.shutdown() above returns.
timed(
walredo_global_state.shutdown(),
"wait for all walredo processes to exit",
Duration::from_secs(1),
)
.await;
// Shut down any page service tasks: any in-progress work for particular timelines or tenants
// should already have been canclled via mgr::shutdown_all_tenants
timed(

View File

@@ -88,6 +88,7 @@ use crate::tenant::remote_timeline_client::MaybeDeletedIndexPart;
use crate::tenant::remote_timeline_client::INITDB_PATH;
use crate::tenant::storage_layer::DeltaLayer;
use crate::tenant::storage_layer::ImageLayer;
use crate::walredo::Error;
use crate::InitializationOrder;
use std::collections::hash_map::Entry;
use std::collections::BTreeSet;
@@ -165,6 +166,7 @@ pub struct TenantSharedResources {
pub broker_client: storage_broker::BrokerClientChannel,
pub remote_storage: GenericRemoteStorage,
pub deletion_queue_client: DeletionQueueClient,
pub walredo_global_state: Arc<crate::walredo::GlobalState>,
}
/// A [`Tenant`] is really an _attached_ tenant. The configuration
@@ -323,6 +325,17 @@ impl From<harness::TestRedoManager> for WalRedoManager {
}
impl WalRedoManager {
pub(crate) async fn shutdown(&self) {
match self {
Self::Prod(mgr) => mgr.shutdown().await,
#[cfg(test)]
Self::Test(mgr) => {
// Not applicable to test redo manager
}
}
}
pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
match self {
Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout),
@@ -343,7 +356,7 @@ impl WalRedoManager {
base_img: Option<(Lsn, bytes::Bytes)>,
records: Vec<(Lsn, crate::walrecord::NeonWalRecord)>,
pg_version: u32,
) -> anyhow::Result<bytes::Bytes> {
) -> Result<bytes::Bytes, Error> {
match self {
Self::Prod(mgr) => {
mgr.request_redo(key, lsn, base_img, records, pg_version)
@@ -649,17 +662,18 @@ impl Tenant {
mode: SpawnMode,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
conf,
tenant_shard_id,
)));
let TenantSharedResources {
broker_client,
remote_storage,
deletion_queue_client,
walredo_global_state,
} = resources;
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
walredo_global_state,
tenant_shard_id,
)));
let attach_mode = attached_conf.location.attach_mode;
let generation = attached_conf.location.generation;
@@ -1877,6 +1891,10 @@ impl Tenant {
tracing::debug!("Waiting for tasks...");
task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), None).await;
if let Some(walredo_mgr) = self.walredo_mgr.as_ref() {
walredo_mgr.shutdown().await;
}
// Wait for any in-flight operations to complete
self.gate.close().await;
@@ -3953,7 +3971,7 @@ pub(crate) mod harness {
base_img: Option<(Lsn, Bytes)>,
records: Vec<(Lsn, NeonWalRecord)>,
_pg_version: u32,
) -> anyhow::Result<Bytes> {
) -> Result<Bytes, Error> {
let records_neon = records.iter().all(|r| apply_neon::can_apply_in_neon(&r.1));
if records_neon {
// For Neon wal records, we can decode without spawning postgres, so do so.

View File

@@ -5235,10 +5235,16 @@ impl Timeline {
.map_err(PageReconstructError::WalRedo)?
.request_redo(key, request_lsn, data.img, data.records, self.pg_version)
.await
.context("reconstruct a page image")
{
Ok(img) => img,
Err(e) => return Err(PageReconstructError::WalRedo(e)),
Err(e) => {
return Err(match e {
crate::walredo::Error::Cancelled => PageReconstructError::Cancelled,
crate::walredo::Error::Other(e) => {
PageReconstructError::WalRedo(e.context("reconstruct a page image"))
}
})
}
};
Ok(img)

View File

@@ -40,8 +40,30 @@ use std::time::Duration;
use std::time::Instant;
use tracing::*;
use utils::lsn::Lsn;
use utils::sync::gate::Gate;
use utils::sync::heavier_once_cell;
pub struct GlobalState {
conf: &'static PageServerConf,
pub(self) spawn_gate: Gate,
}
impl GlobalState {
pub async fn spawn(conf: &'static PageServerConf) -> Arc<GlobalState> {
let state = Arc::new(GlobalState {
conf,
spawn_gate: Gate::default(),
});
state
}
pub(crate) async fn shutdown(self: &Arc<Self>) {
self.spawn_gate.close().await
// The destructor of WalRedoProcess SIGKILLs and `wait()`s for the process
// The gate guard is stored in WalRedoProcess.
// So, we arrive here once all WalRedoProcess structs are gone.
}
}
///
/// This is the real implementation that uses a Postgres process to
/// perform WAL replay. Only one thread can use the process at a time,
@@ -50,8 +72,8 @@ use utils::sync::heavier_once_cell;
/// records.
///
pub struct PostgresRedoManager {
global_state: Arc<GlobalState>,
tenant_shard_id: TenantShardId,
conf: &'static PageServerConf,
last_redo_at: std::sync::Mutex<Option<Instant>>,
/// The current [`process::WalRedoProcess`] that is used by new redo requests.
/// We use [`heavier_once_cell`] for coalescing the spawning, but the redo
@@ -65,9 +87,30 @@ pub struct PostgresRedoManager {
/// still be using the old redo process. But, those other tasks will most likely
/// encounter an error as well, and errors are an unexpected condition anyway.
/// So, probably we could get rid of the `Arc` in the future.
redo_process: heavier_once_cell::OnceCell<Arc<process::WalRedoProcess>>,
redo_process: heavier_once_cell::OnceCell<RedoProcessState>,
launch_process_gate: Gate,
}
enum RedoProcessState {
Launched(Arc<process::WalRedoProcess>),
ManagerShutDown,
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("cancelled")]
Cancelled,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
macro_rules! bail {
($($arg:tt)*) => {
return Err($crate::walredo::Error::Other(::anyhow::anyhow!($($arg)*)));
}
}
pub(self) use bail;
///
/// Public interface of WAL redo manager
///
@@ -88,9 +131,9 @@ impl PostgresRedoManager {
base_img: Option<(Lsn, Bytes)>,
records: Vec<(Lsn, NeonWalRecord)>,
pg_version: u32,
) -> anyhow::Result<Bytes> {
) -> Result<Bytes, Error> {
if records.is_empty() {
anyhow::bail!("invalid WAL redo request with no records");
bail!("invalid WAL redo request with no records");
}
let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID);
@@ -110,7 +153,7 @@ impl PostgresRedoManager {
img,
base_img_lsn,
&records[batch_start..i],
self.conf.wal_redo_timeout,
self.global_state.conf.wal_redo_timeout,
pg_version,
)
.await
@@ -131,7 +174,7 @@ impl PostgresRedoManager {
img,
base_img_lsn,
&records[batch_start..],
self.conf.wal_redo_timeout,
self.global_state.conf.wal_redo_timeout,
pg_version,
)
.await
@@ -148,10 +191,10 @@ impl PostgresRedoManager {
chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?)
})
},
process: self
.redo_process
.get()
.map(|p| WalRedoManagerProcessStatus { pid: p.id() }),
process: self.redo_process.get().and_then(|p| match &*p {
RedoProcessState::Launched(p) => Some(WalRedoManagerProcessStatus { pid: p.id() }),
RedoProcessState::ManagerShutDown => None,
}),
}
}
}
@@ -161,18 +204,36 @@ impl PostgresRedoManager {
/// Create a new PostgresRedoManager.
///
pub fn new(
conf: &'static PageServerConf,
global_state: Arc<GlobalState>,
tenant_shard_id: TenantShardId,
) -> PostgresRedoManager {
// The actual process is launched lazily, on first request.
PostgresRedoManager {
global_state,
tenant_shard_id,
conf,
last_redo_at: std::sync::Mutex::default(),
redo_process: heavier_once_cell::OnceCell::default(),
launch_process_gate: Gate::default(),
}
}
pub async fn shutdown(&self) {
// prevent new launches
let permit = match self.redo_process.get_or_init_detached().await {
Ok(guard) => {
let (proc, permit) = guard.take_and_deinit();
drop(proc); // this just drops the Arc, its refcount may not be zero yet
permit
}
Err(permit) => permit,
};
self.redo_process
.set(RedoProcessState::ManagerShutDown, permit);
// wait for all WalRedoProcess objects to get dropped
self.launch_process_gate.close().await;
}
/// This type doesn't have its own background task to check for idleness: we
/// rely on our owner calling this function periodically in its own housekeeping
/// loops.
@@ -203,7 +264,7 @@ impl PostgresRedoManager {
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
pg_version: u32,
) -> anyhow::Result<Bytes> {
) -> Result<Bytes, Error> {
*(self.last_redo_at.lock().unwrap()) = Some(Instant::now());
let (rel, blknum) = key.to_rel_block().context("invalid record")?;
@@ -212,17 +273,27 @@ impl PostgresRedoManager {
loop {
let proc: Arc<process::WalRedoProcess> =
match self.redo_process.get_or_init_detached().await {
Ok(guard) => Arc::clone(&guard),
Ok(guard) => match &*guard {
RedoProcessState::Launched(proc) => Arc::clone(proc),
RedoProcessState::ManagerShutDown => {
return Err(Error::Cancelled);
}
},
Err(permit) => {
// don't hold poison_guard, the launch code can bail
let start = Instant::now();
let proc = Arc::new(
process::WalRedoProcess::launch(
self.conf,
&self.global_state,
self.tenant_shard_id,
pg_version,
)
.context("launch walredo process")?,
.map_err(|e| match e {
process::LaunchError::Cancelled => Error::Cancelled,
process::LaunchError::Other(e) => {
Error::Other(e.context("launch walredo process"))
}
})?,
);
let duration = start.elapsed();
WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM.observe(duration.as_secs_f64());
@@ -231,7 +302,8 @@ impl PostgresRedoManager {
pid = proc.id(),
"launched walredo process"
);
self.redo_process.set(Arc::clone(&proc), permit);
self.redo_process
.set(RedoProcessState::Launched(Arc::clone(&proc)), permit);
proc
}
};
@@ -242,7 +314,14 @@ impl PostgresRedoManager {
let result = proc
.apply_wal_records(rel, blknum, &base_img, records, wal_redo_timeout)
.await
.context("apply_wal_records");
.map_err(|e| match e {
Error::Cancelled => Error::Cancelled,
Error::Other(e) => Error::Other(e.context("apply_wal_records")),
});
if matches!(result, Err(Error::Cancelled)) {
// bail asap and also avoid log noise due to the error reporting below
return Err(Error::Cancelled);
}
let duration = started_at.elapsed();
@@ -299,12 +378,17 @@ impl PostgresRedoManager {
match self.redo_process.get() {
None => (),
Some(guard) => {
if Arc::ptr_eq(&proc, &*guard) {
// We're the first to observe an error from `proc`, it's our job to take it out of rotation.
guard.take_and_deinit();
} else {
// Another task already spawned another redo process (further up in this method)
// and put it into `redo_process`. Do nothing, our view of the world is behind.
match &*guard {
RedoProcessState::ManagerShutDown => {}
RedoProcessState::Launched(guard_proc) => {
if Arc::ptr_eq(&proc, &*guard_proc) {
// We're the first to observe an error from `proc`, it's our job to take it out of rotation.
guard.take_and_deinit();
} else {
// Another task already spawned another redo process (further up in this method)
// and put it into `redo_process`. Do nothing, our view of the world is behind.
}
}
}
}
}
@@ -329,7 +413,7 @@ impl PostgresRedoManager {
lsn: Lsn,
base_img: Option<Bytes>,
records: &[(Lsn, NeonWalRecord)],
) -> anyhow::Result<Bytes> {
) -> Result<Bytes, Error> {
let start_time = Instant::now();
let mut page = BytesMut::new();
@@ -338,7 +422,7 @@ impl PostgresRedoManager {
page.extend_from_slice(&fpi[..]);
} else {
// All the current WAL record types that we can handle require a base image.
anyhow::bail!("invalid neon WAL redo request with no base image");
bail!("invalid neon WAL redo request with no base image");
}
// Apply all the WAL records in the batch

View File

@@ -18,13 +18,21 @@ use std::sync::atomic::AtomicUsize;
use std::{
collections::VecDeque,
process::{Command, Stdio},
sync::Arc,
time::Duration,
};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing::{debug, error, instrument, Instrument};
use utils::{lsn::Lsn, poison::Poison};
use utils::{
lsn::Lsn,
poison::Poison,
sync::gate::{GateError, GateGuard},
};
use super::GlobalState;
pub struct WalRedoProcess {
_spawn_gate_guard: GateGuard,
#[allow(dead_code)]
conf: &'static PageServerConf,
#[cfg(feature = "testing")]
@@ -49,18 +57,33 @@ struct ProcessOutput {
n_processed_responses: usize,
}
#[derive(Debug, thiserror::Error)]
pub(super) enum LaunchError {
#[error("cancelled")]
Cancelled,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
impl WalRedoProcess {
//
// Start postgres binary in special WAL redo mode.
//
#[instrument(skip_all,fields(pg_version=pg_version))]
pub(crate) fn launch(
conf: &'static PageServerConf,
global_state: &Arc<GlobalState>,
tenant_shard_id: TenantShardId,
pg_version: u32,
) -> anyhow::Result<Self> {
) -> Result<Self, LaunchError> {
crate::span::debug_assert_current_span_has_tenant_id();
let conf = global_state.conf;
let spawn_gate_guard = match global_state.spawn_gate.enter() {
Ok(guard) => guard,
Err(GateError::GateClosed) => return Err(LaunchError::Cancelled),
};
let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible.
let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?;
@@ -144,6 +167,7 @@ impl WalRedoProcess {
);
Ok(Self {
_spawn_gate_guard: spawn_gate_guard,
conf,
#[cfg(feature = "testing")]
tenant_shard_id,
@@ -189,7 +213,7 @@ impl WalRedoProcess {
base_img: &Option<Bytes>,
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
) -> anyhow::Result<Bytes> {
) -> Result<Bytes, super::Error> {
debug_assert_current_span_has_tenant_id();
let tag = protocol::BufferTag { rel, blknum };
@@ -216,17 +240,19 @@ impl WalRedoProcess {
{
protocol::build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
} else {
anyhow::bail!("tried to pass neon wal record to postgres WAL redo");
super::bail!("tried to pass neon wal record to postgres WAL redo");
}
}
protocol::build_get_page_msg(tag, &mut writebuf);
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
let Ok(res) =
tokio::time::timeout(wal_redo_timeout, self.apply_wal_records0(&writebuf)).await
else {
anyhow::bail!("WAL redo timed out");
};
let res =
// TODO: we should tokio::select! on the self.global_state.shutdown here,
// but, that requires thinking through the perf implications
tokio::time::timeout(wal_redo_timeout, self.apply_wal_records0(&writebuf))
.await
.map_err(|_elapsed| anyhow::anyhow!("WAL redo timed out"))?
.map_err(super::Error::Other);
if res.is_err() {
// not all of these can be caused by this particular input, however these are so rare
@@ -377,6 +403,11 @@ impl Drop for WalRedoProcess {
.take()
.expect("we only do this once")
.kill_and_wait(WalRedoKillCause::WalRedoProcessDrop);
// The spawn gate is supposed to track running walredo processes.
// => must keep guard alive until the process is dead.
let _ = &self._spawn_gate_guard;
// no way to wait for stderr_logger_task from Drop because that is async only
}
}