mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 02:12:56 +00:00
implement a global gate + cancellation mechanism for live walredo processes, hook up to shutdown
This commit is contained in:
@@ -394,6 +394,11 @@ fn start_pageserver(
|
||||
deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle());
|
||||
}
|
||||
|
||||
// Set up global walredo manager state
|
||||
let walredo_global_state = BACKGROUND_RUNTIME.block_on(
|
||||
pageserver::walredo::GlobalState::spawn(conf, shutdown_pageserver.clone()),
|
||||
);
|
||||
|
||||
// Up to this point no significant I/O has been done: this should have been fast. Record
|
||||
// duration prior to starting I/O intensive phase of startup.
|
||||
startup_checkpoint(started_startup_at, "initial", "Starting loading tenants");
|
||||
@@ -429,6 +434,7 @@ fn start_pageserver(
|
||||
broker_client: broker_client.clone(),
|
||||
remote_storage: remote_storage.clone(),
|
||||
deletion_queue_client,
|
||||
walredo_global_state: Arc::clone(&walredo_global_state),
|
||||
},
|
||||
order,
|
||||
shutdown_pageserver.clone(),
|
||||
|
||||
@@ -166,6 +166,7 @@ pub struct TenantSharedResources {
|
||||
pub broker_client: storage_broker::BrokerClientChannel,
|
||||
pub remote_storage: GenericRemoteStorage,
|
||||
pub deletion_queue_client: DeletionQueueClient,
|
||||
pub walredo_global_state: Arc<crate::walredo::GlobalState>,
|
||||
}
|
||||
|
||||
/// A [`Tenant`] is really an _attached_ tenant. The configuration
|
||||
@@ -650,17 +651,18 @@ impl Tenant {
|
||||
mode: SpawnMode,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Tenant>> {
|
||||
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
|
||||
conf,
|
||||
tenant_shard_id,
|
||||
)));
|
||||
|
||||
let TenantSharedResources {
|
||||
broker_client,
|
||||
remote_storage,
|
||||
deletion_queue_client,
|
||||
walredo_global_state,
|
||||
} = resources;
|
||||
|
||||
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
|
||||
walredo_global_state,
|
||||
tenant_shard_id,
|
||||
)));
|
||||
|
||||
let attach_mode = attached_conf.location.attach_mode;
|
||||
let generation = attached_conf.location.generation;
|
||||
|
||||
|
||||
@@ -35,13 +35,60 @@ use anyhow::Context;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use pageserver_api::models::{WalRedoManagerProcessStatus, WalRedoManagerStatus};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::sync::gate::Gate;
|
||||
use utils::sync::heavier_once_cell;
|
||||
|
||||
pub struct GlobalState {
|
||||
conf: &'static PageServerConf,
|
||||
/// Watched by the [`Self::actor`].
|
||||
shutdown: CancellationToken,
|
||||
/// Set by [`Self::actor`] when [`Self::shutdown`] is cancelled.
|
||||
/// We do this to avoid the Mutex lock inside the `CancellationToken`.
|
||||
shutdown_bool: AtomicBool,
|
||||
pub(self) spawn_gate: Gate,
|
||||
}
|
||||
|
||||
impl GlobalState {
|
||||
pub async fn spawn(
|
||||
conf: &'static PageServerConf,
|
||||
shutdown: CancellationToken,
|
||||
) -> Arc<GlobalState> {
|
||||
let state = Arc::new(GlobalState {
|
||||
conf,
|
||||
shutdown,
|
||||
shutdown_bool: AtomicBool::new(false), // if `shutdown` is cancelled already, the task spawned below will set it promptly
|
||||
spawn_gate: Gate::default(),
|
||||
});
|
||||
tokio::spawn({
|
||||
let state = Arc::clone(&state);
|
||||
async move {
|
||||
info!("starting");
|
||||
state.actor().await;
|
||||
info!("done");
|
||||
}
|
||||
.instrument(info_span!(parent: None, "walredo_global_state"))
|
||||
});
|
||||
state
|
||||
}
|
||||
async fn actor(self: Arc<Self>) {
|
||||
self.shutdown.cancelled().await;
|
||||
info!("propagating cancellation");
|
||||
self.shutdown_bool.store(true, Ordering::Relaxed);
|
||||
self.spawn_gate.close().await;
|
||||
info!("all walredo processes have been killed and no new ones will be spawned");
|
||||
}
|
||||
pub(self) fn is_shutdown_requested(self: &Arc<Self>) -> bool {
|
||||
self.shutdown_bool.load(Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// This is the real implementation that uses a Postgres process to
|
||||
/// perform WAL replay. Only one thread can use the process at a time,
|
||||
@@ -50,8 +97,8 @@ use utils::sync::heavier_once_cell;
|
||||
/// records.
|
||||
///
|
||||
pub struct PostgresRedoManager {
|
||||
global_state: Arc<GlobalState>,
|
||||
tenant_shard_id: TenantShardId,
|
||||
conf: &'static PageServerConf,
|
||||
last_redo_at: std::sync::Mutex<Option<Instant>>,
|
||||
/// The current [`process::WalRedoProcess`] that is used by new redo requests.
|
||||
/// We use [`heavier_once_cell`] for coalescing the spawning, but the redo
|
||||
@@ -78,9 +125,10 @@ pub enum Error {
|
||||
|
||||
macro_rules! bail {
|
||||
($($arg:tt)*) => {
|
||||
return Err(Error::Other(anyhow::anyhow!($($arg)*)));
|
||||
return Err($crate::walredo::Error::Other(::anyhow::anyhow!($($arg)*)));
|
||||
}
|
||||
}
|
||||
pub(self) use bail;
|
||||
|
||||
///
|
||||
/// Public interface of WAL redo manager
|
||||
@@ -124,7 +172,7 @@ impl PostgresRedoManager {
|
||||
img,
|
||||
base_img_lsn,
|
||||
&records[batch_start..i],
|
||||
self.conf.wal_redo_timeout,
|
||||
self.global_state.conf.wal_redo_timeout,
|
||||
pg_version,
|
||||
)
|
||||
.await
|
||||
@@ -145,7 +193,7 @@ impl PostgresRedoManager {
|
||||
img,
|
||||
base_img_lsn,
|
||||
&records[batch_start..],
|
||||
self.conf.wal_redo_timeout,
|
||||
self.global_state.conf.wal_redo_timeout,
|
||||
pg_version,
|
||||
)
|
||||
.await
|
||||
@@ -175,13 +223,13 @@ impl PostgresRedoManager {
|
||||
/// Create a new PostgresRedoManager.
|
||||
///
|
||||
pub fn new(
|
||||
conf: &'static PageServerConf,
|
||||
global_state: Arc<GlobalState>,
|
||||
tenant_shard_id: TenantShardId,
|
||||
) -> PostgresRedoManager {
|
||||
// The actual process is launched lazily, on first request.
|
||||
PostgresRedoManager {
|
||||
global_state,
|
||||
tenant_shard_id,
|
||||
conf,
|
||||
last_redo_at: std::sync::Mutex::default(),
|
||||
redo_process: heavier_once_cell::OnceCell::default(),
|
||||
}
|
||||
@@ -232,11 +280,16 @@ impl PostgresRedoManager {
|
||||
let start = Instant::now();
|
||||
let proc = Arc::new(
|
||||
process::WalRedoProcess::launch(
|
||||
self.conf,
|
||||
&self.global_state,
|
||||
self.tenant_shard_id,
|
||||
pg_version,
|
||||
)
|
||||
.context("launch walredo process")?,
|
||||
.map_err(|e| match e {
|
||||
process::LaunchError::Cancelled => Error::Cancelled,
|
||||
process::LaunchError::Other(e) => {
|
||||
Error::Other(e.context("launch walredo process"))
|
||||
}
|
||||
})?,
|
||||
);
|
||||
let duration = start.elapsed();
|
||||
WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM.observe(duration.as_secs_f64());
|
||||
@@ -257,6 +310,12 @@ impl PostgresRedoManager {
|
||||
.apply_wal_records(rel, blknum, &base_img, records, wal_redo_timeout)
|
||||
.await
|
||||
.context("apply_wal_records");
|
||||
if result.is_err() {
|
||||
// avoid
|
||||
if self.global_state.shutdown.is_cancelled() {
|
||||
return Err(Error::Cancelled);
|
||||
}
|
||||
}
|
||||
|
||||
let duration = started_at.elapsed();
|
||||
|
||||
|
||||
@@ -18,13 +18,22 @@ use std::sync::atomic::AtomicUsize;
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
process::{Command, Stdio},
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tracing::{debug, error, instrument, Instrument};
|
||||
use utils::{lsn::Lsn, poison::Poison};
|
||||
use utils::{
|
||||
lsn::Lsn,
|
||||
poison::Poison,
|
||||
sync::gate::{GateError, GateGuard},
|
||||
};
|
||||
|
||||
use super::GlobalState;
|
||||
|
||||
pub struct WalRedoProcess {
|
||||
global_state: Arc<GlobalState>,
|
||||
_spawn_gate_guard: GateGuard,
|
||||
#[allow(dead_code)]
|
||||
conf: &'static PageServerConf,
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -49,18 +58,33 @@ struct ProcessOutput {
|
||||
n_processed_responses: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(super) enum LaunchError {
|
||||
#[error("cancelled")]
|
||||
Cancelled,
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
impl WalRedoProcess {
|
||||
//
|
||||
// Start postgres binary in special WAL redo mode.
|
||||
//
|
||||
#[instrument(skip_all,fields(pg_version=pg_version))]
|
||||
pub(crate) fn launch(
|
||||
conf: &'static PageServerConf,
|
||||
global_state: &Arc<GlobalState>,
|
||||
tenant_shard_id: TenantShardId,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<Self> {
|
||||
) -> Result<Self, LaunchError> {
|
||||
crate::span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let conf = global_state.conf;
|
||||
|
||||
let spawn_gate_guard = match global_state.spawn_gate.enter() {
|
||||
Ok(guard) => guard,
|
||||
Err(GateError::GateClosed) => return Err(LaunchError::Cancelled),
|
||||
};
|
||||
|
||||
let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible.
|
||||
let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?;
|
||||
|
||||
@@ -144,6 +168,8 @@ impl WalRedoProcess {
|
||||
);
|
||||
|
||||
Ok(Self {
|
||||
global_state: global_state.clone(),
|
||||
_spawn_gate_guard: spawn_gate_guard,
|
||||
conf,
|
||||
#[cfg(feature = "testing")]
|
||||
tenant_shard_id,
|
||||
@@ -189,9 +215,13 @@ impl WalRedoProcess {
|
||||
base_img: &Option<Bytes>,
|
||||
records: &[(Lsn, NeonWalRecord)],
|
||||
wal_redo_timeout: Duration,
|
||||
) -> anyhow::Result<Bytes> {
|
||||
) -> Result<Bytes, super::Error> {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
|
||||
if self.global_state.is_shutdown_requested() {
|
||||
return Err(super::Error::Cancelled);
|
||||
}
|
||||
|
||||
let tag = protocol::BufferTag { rel, blknum };
|
||||
|
||||
// Serialize all the messages to send the WAL redo process first.
|
||||
@@ -216,17 +246,19 @@ impl WalRedoProcess {
|
||||
{
|
||||
protocol::build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
|
||||
} else {
|
||||
anyhow::bail!("tried to pass neon wal record to postgres WAL redo");
|
||||
super::bail!("tried to pass neon wal record to postgres WAL redo");
|
||||
}
|
||||
}
|
||||
protocol::build_get_page_msg(tag, &mut writebuf);
|
||||
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
|
||||
|
||||
let Ok(res) =
|
||||
tokio::time::timeout(wal_redo_timeout, self.apply_wal_records0(&writebuf)).await
|
||||
else {
|
||||
anyhow::bail!("WAL redo timed out");
|
||||
};
|
||||
let res =
|
||||
// TODO: we should tokio::select! on the self.global_state.shutdown here,
|
||||
// but, that requires thinking through the perf implications
|
||||
tokio::time::timeout(wal_redo_timeout, self.apply_wal_records0(&writebuf))
|
||||
.await
|
||||
.map_err(|_elapsed| anyhow::anyhow!("WAL redo timed out"))?
|
||||
.map_err(super::Error::Other);
|
||||
|
||||
if res.is_err() {
|
||||
// not all of these can be caused by this particular input, however these are so rare
|
||||
@@ -377,6 +409,11 @@ impl Drop for WalRedoProcess {
|
||||
.take()
|
||||
.expect("we only do this once")
|
||||
.kill_and_wait(WalRedoKillCause::WalRedoProcessDrop);
|
||||
|
||||
// The spawn gate is supposed to track running walredo processes.
|
||||
// => must keep guard alive until the process is dead.
|
||||
let _ = &self._spawn_gate_guard;
|
||||
|
||||
// no way to wait for stderr_logger_task from Drop because that is async only
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user