mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 18:02:56 +00:00
trim down the PR to just keeping track of walredo processes
This commit is contained in:
@@ -394,10 +394,9 @@ fn start_pageserver(
|
||||
deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle());
|
||||
}
|
||||
|
||||
// Set up global tracking of walredo processes
|
||||
let walredo_global_state = BACKGROUND_RUNTIME.block_on(
|
||||
pageserver::walredo::GlobalState::spawn(conf, shutdown_pageserver.clone()),
|
||||
);
|
||||
// Set up global state shared by all walredo processes.
|
||||
let walredo_global_state =
|
||||
BACKGROUND_RUNTIME.block_on(pageserver::walredo::GlobalState::spawn(conf));
|
||||
|
||||
// Up to this point no significant I/O has been done: this should have been fast. Record
|
||||
// duration prior to starting I/O intensive phase of startup.
|
||||
@@ -695,7 +694,13 @@ fn start_pageserver(
|
||||
// Right now that tree doesn't reach very far, and `task_mgr` is used instead.
|
||||
// The plan is to change that over time.
|
||||
shutdown_pageserver.take();
|
||||
pageserver::shutdown_pageserver(&tenant_manager, deletion_queue.clone(), 0).await;
|
||||
pageserver::shutdown_pageserver(
|
||||
&tenant_manager,
|
||||
deletion_queue.clone(),
|
||||
walredo_global_state,
|
||||
0,
|
||||
)
|
||||
.await;
|
||||
unreachable!()
|
||||
})
|
||||
}
|
||||
|
||||
@@ -82,13 +82,14 @@ pub async fn shutdown_pageserver(
|
||||
)
|
||||
.await;
|
||||
|
||||
// In theory, walredo processes are tenant-scoped and should have been shut down after
|
||||
// tenant manager shutdown above.
|
||||
// In practive, we have lingering walredo processes even after pageserver shutdowns that
|
||||
// don't hit the systemd TimeoutSec timeout of 10 seconds (i.e., that log `Shut down successfully completed` below).
|
||||
// walredo processes are tenant-scoped and should have been shut down after tenant manager shutdown above.
|
||||
//
|
||||
// In practive, we have lingering walredo processes even when pageserver shuts down cleanly, i.e., even when it
|
||||
// does not hit systemd's TimeoutSec timeout (10 seconds in prod).
|
||||
// TODO: understand why the processes aren't gone by the time tenant_manager.shutdown() above returns.
|
||||
timed(
|
||||
walredo_global_state.wait_shutdown_complete(),
|
||||
"wait for walredo processes to exit",
|
||||
walredo_global_state.shutdown(),
|
||||
"wait for all walredo processes to exit",
|
||||
Duration::from_secs(1),
|
||||
)
|
||||
.await;
|
||||
@@ -118,11 +119,10 @@ pub async fn shutdown_pageserver(
|
||||
// There should be nothing left, but let's be sure
|
||||
timed(
|
||||
task_mgr::shutdown_tasks(None, None, None),
|
||||
"shutdown taskmgr leftovers",
|
||||
"shutdown leftovers",
|
||||
Duration::from_secs(1),
|
||||
)
|
||||
.await;
|
||||
|
||||
info!("Shut down successfully completed");
|
||||
std::process::exit(exit_code);
|
||||
}
|
||||
|
||||
@@ -5240,7 +5240,9 @@ impl Timeline {
|
||||
Err(e) => {
|
||||
return Err(match e {
|
||||
crate::walredo::Error::Cancelled => PageReconstructError::Cancelled,
|
||||
crate::walredo::Error::Other(e) => PageReconstructError::WalRedo(e),
|
||||
crate::walredo::Error::Other(e) => {
|
||||
PageReconstructError::WalRedo(e.context("reconstruct a page image"))
|
||||
}
|
||||
})
|
||||
}
|
||||
};
|
||||
|
||||
@@ -35,11 +35,9 @@ use anyhow::Context;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use pageserver_api::models::{WalRedoManagerProcessStatus, WalRedoManagerStatus};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::sync::gate::Gate;
|
||||
@@ -47,49 +45,18 @@ use utils::sync::heavier_once_cell;
|
||||
|
||||
pub struct GlobalState {
|
||||
conf: &'static PageServerConf,
|
||||
/// Watched by the [`Self::actor`].
|
||||
shutdown: CancellationToken,
|
||||
/// Set by [`Self::actor`] when [`Self::shutdown`] is cancelled.
|
||||
/// We do this to avoid the Mutex lock inside the `CancellationToken`.
|
||||
shutdown_bool: AtomicBool,
|
||||
pub(self) spawn_gate: Gate,
|
||||
}
|
||||
|
||||
impl GlobalState {
|
||||
pub async fn spawn(
|
||||
conf: &'static PageServerConf,
|
||||
shutdown: CancellationToken,
|
||||
) -> Arc<GlobalState> {
|
||||
pub async fn spawn(conf: &'static PageServerConf) -> Arc<GlobalState> {
|
||||
let state = Arc::new(GlobalState {
|
||||
conf,
|
||||
shutdown,
|
||||
shutdown_bool: AtomicBool::new(false), // if `shutdown` is cancelled already, the task spawned below will set it promptly
|
||||
spawn_gate: Gate::default(),
|
||||
});
|
||||
tokio::spawn({
|
||||
let state = Arc::clone(&state);
|
||||
async move {
|
||||
info!("starting");
|
||||
state.actor().await;
|
||||
info!("done");
|
||||
}
|
||||
.instrument(info_span!(parent: None, "walredo_global_state"))
|
||||
});
|
||||
state
|
||||
}
|
||||
async fn actor(self: Arc<Self>) {
|
||||
self.shutdown.cancelled().await;
|
||||
info!("propagating cancellation");
|
||||
self.shutdown_bool.store(true, Ordering::Relaxed);
|
||||
self.spawn_gate.close().await;
|
||||
info!("all walredo processes have been killed and no new ones will be spawned");
|
||||
}
|
||||
|
||||
pub(crate) async fn wait_shutdown_complete(self: &Arc<Self>) {
|
||||
assert!(
|
||||
self.shutdown.is_cancelled(),
|
||||
"must cancel the `shutdown` token before waiting, otherwise we will wait forever"
|
||||
);
|
||||
pub(crate) async fn shutdown(self: &Arc<Self>) {
|
||||
self.spawn_gate.close().await
|
||||
// The destructor of WalRedoProcess SIGKILLs and `wait()`s for the process
|
||||
// The gate guard is stored in WalRedoProcess.
|
||||
|
||||
@@ -32,7 +32,6 @@ use utils::{
|
||||
use super::GlobalState;
|
||||
|
||||
pub struct WalRedoProcess {
|
||||
global_state: Arc<GlobalState>,
|
||||
_spawn_gate_guard: GateGuard,
|
||||
#[allow(dead_code)]
|
||||
conf: &'static PageServerConf,
|
||||
@@ -168,7 +167,6 @@ impl WalRedoProcess {
|
||||
);
|
||||
|
||||
Ok(Self {
|
||||
global_state: global_state.clone(),
|
||||
_spawn_gate_guard: spawn_gate_guard,
|
||||
conf,
|
||||
#[cfg(feature = "testing")]
|
||||
|
||||
Reference in New Issue
Block a user