mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-03 16:30:38 +00:00
Compare commits
8 Commits
conrad/pro
...
problame/w
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c8a3b6f930 | ||
|
|
6779c908eb | ||
|
|
5202d2dc98 | ||
|
|
d7e0c99616 | ||
|
|
7507d137de | ||
|
|
34f42669fa | ||
|
|
943220df9b | ||
|
|
5e0ef715aa |
@@ -394,6 +394,10 @@ fn start_pageserver(
|
||||
deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle());
|
||||
}
|
||||
|
||||
// Set up global state shared by all walredo processes.
|
||||
let walredo_global_state =
|
||||
BACKGROUND_RUNTIME.block_on(pageserver::walredo::GlobalState::spawn(conf));
|
||||
|
||||
// Up to this point no significant I/O has been done: this should have been fast. Record
|
||||
// duration prior to starting I/O intensive phase of startup.
|
||||
startup_checkpoint(started_startup_at, "initial", "Starting loading tenants");
|
||||
@@ -429,6 +433,7 @@ fn start_pageserver(
|
||||
broker_client: broker_client.clone(),
|
||||
remote_storage: remote_storage.clone(),
|
||||
deletion_queue_client,
|
||||
walredo_global_state: Arc::clone(&walredo_global_state),
|
||||
},
|
||||
order,
|
||||
shutdown_pageserver.clone(),
|
||||
@@ -689,7 +694,13 @@ fn start_pageserver(
|
||||
// Right now that tree doesn't reach very far, and `task_mgr` is used instead.
|
||||
// The plan is to change that over time.
|
||||
shutdown_pageserver.take();
|
||||
pageserver::shutdown_pageserver(&tenant_manager, deletion_queue.clone(), 0).await;
|
||||
pageserver::shutdown_pageserver(
|
||||
&tenant_manager,
|
||||
deletion_queue.clone(),
|
||||
walredo_global_state,
|
||||
0,
|
||||
)
|
||||
.await;
|
||||
unreachable!()
|
||||
})
|
||||
}
|
||||
|
||||
@@ -11,6 +11,8 @@ pub mod deletion_queue;
|
||||
pub mod disk_usage_eviction_task;
|
||||
pub mod http;
|
||||
pub mod import_datadir;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub use pageserver_api::keyspace;
|
||||
pub mod aux_file;
|
||||
pub mod metrics;
|
||||
@@ -58,6 +60,7 @@ pub use crate::metrics::preinitialize_metrics;
|
||||
pub async fn shutdown_pageserver(
|
||||
tenant_manager: &TenantManager,
|
||||
mut deletion_queue: DeletionQueue,
|
||||
walredo_global_state: Arc<walredo::GlobalState>,
|
||||
exit_code: i32,
|
||||
) {
|
||||
use std::time::Duration;
|
||||
@@ -79,6 +82,18 @@ pub async fn shutdown_pageserver(
|
||||
)
|
||||
.await;
|
||||
|
||||
// walredo processes are tenant-scoped and should have been shut down after tenant manager shutdown above.
|
||||
//
|
||||
// In practive, we have lingering walredo processes even when pageserver shuts down cleanly, i.e., even when it
|
||||
// does not hit systemd's TimeoutSec timeout (10 seconds in prod).
|
||||
// TODO: understand why the processes aren't gone by the time tenant_manager.shutdown() above returns.
|
||||
timed(
|
||||
walredo_global_state.shutdown(),
|
||||
"wait for all walredo processes to exit",
|
||||
Duration::from_secs(1),
|
||||
)
|
||||
.await;
|
||||
|
||||
// Shut down any page service tasks: any in-progress work for particular timelines or tenants
|
||||
// should already have been canclled via mgr::shutdown_all_tenants
|
||||
timed(
|
||||
|
||||
@@ -88,6 +88,7 @@ use crate::tenant::remote_timeline_client::MaybeDeletedIndexPart;
|
||||
use crate::tenant::remote_timeline_client::INITDB_PATH;
|
||||
use crate::tenant::storage_layer::DeltaLayer;
|
||||
use crate::tenant::storage_layer::ImageLayer;
|
||||
use crate::walredo::Error;
|
||||
use crate::InitializationOrder;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::BTreeSet;
|
||||
@@ -165,6 +166,7 @@ pub struct TenantSharedResources {
|
||||
pub broker_client: storage_broker::BrokerClientChannel,
|
||||
pub remote_storage: GenericRemoteStorage,
|
||||
pub deletion_queue_client: DeletionQueueClient,
|
||||
pub walredo_global_state: Arc<crate::walredo::GlobalState>,
|
||||
}
|
||||
|
||||
/// A [`Tenant`] is really an _attached_ tenant. The configuration
|
||||
@@ -323,6 +325,17 @@ impl From<harness::TestRedoManager> for WalRedoManager {
|
||||
}
|
||||
|
||||
impl WalRedoManager {
|
||||
|
||||
pub(crate) async fn shutdown(&self) {
|
||||
match self {
|
||||
Self::Prod(mgr) => mgr.shutdown().await,
|
||||
#[cfg(test)]
|
||||
Self::Test(mgr) => {
|
||||
// Not applicable to test redo manager
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
|
||||
match self {
|
||||
Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout),
|
||||
@@ -343,7 +356,7 @@ impl WalRedoManager {
|
||||
base_img: Option<(Lsn, bytes::Bytes)>,
|
||||
records: Vec<(Lsn, crate::walrecord::NeonWalRecord)>,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<bytes::Bytes> {
|
||||
) -> Result<bytes::Bytes, Error> {
|
||||
match self {
|
||||
Self::Prod(mgr) => {
|
||||
mgr.request_redo(key, lsn, base_img, records, pg_version)
|
||||
@@ -649,17 +662,18 @@ impl Tenant {
|
||||
mode: SpawnMode,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Tenant>> {
|
||||
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
|
||||
conf,
|
||||
tenant_shard_id,
|
||||
)));
|
||||
|
||||
let TenantSharedResources {
|
||||
broker_client,
|
||||
remote_storage,
|
||||
deletion_queue_client,
|
||||
walredo_global_state,
|
||||
} = resources;
|
||||
|
||||
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
|
||||
walredo_global_state,
|
||||
tenant_shard_id,
|
||||
)));
|
||||
|
||||
let attach_mode = attached_conf.location.attach_mode;
|
||||
let generation = attached_conf.location.generation;
|
||||
|
||||
@@ -1877,6 +1891,10 @@ impl Tenant {
|
||||
tracing::debug!("Waiting for tasks...");
|
||||
task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), None).await;
|
||||
|
||||
if let Some(walredo_mgr) = self.walredo_mgr.as_ref() {
|
||||
walredo_mgr.shutdown().await;
|
||||
}
|
||||
|
||||
// Wait for any in-flight operations to complete
|
||||
self.gate.close().await;
|
||||
|
||||
@@ -3953,7 +3971,7 @@ pub(crate) mod harness {
|
||||
base_img: Option<(Lsn, Bytes)>,
|
||||
records: Vec<(Lsn, NeonWalRecord)>,
|
||||
_pg_version: u32,
|
||||
) -> anyhow::Result<Bytes> {
|
||||
) -> Result<Bytes, Error> {
|
||||
let records_neon = records.iter().all(|r| apply_neon::can_apply_in_neon(&r.1));
|
||||
if records_neon {
|
||||
// For Neon wal records, we can decode without spawning postgres, so do so.
|
||||
|
||||
@@ -5235,10 +5235,16 @@ impl Timeline {
|
||||
.map_err(PageReconstructError::WalRedo)?
|
||||
.request_redo(key, request_lsn, data.img, data.records, self.pg_version)
|
||||
.await
|
||||
.context("reconstruct a page image")
|
||||
{
|
||||
Ok(img) => img,
|
||||
Err(e) => return Err(PageReconstructError::WalRedo(e)),
|
||||
Err(e) => {
|
||||
return Err(match e {
|
||||
crate::walredo::Error::Cancelled => PageReconstructError::Cancelled,
|
||||
crate::walredo::Error::Other(e) => {
|
||||
PageReconstructError::WalRedo(e.context("reconstruct a page image"))
|
||||
}
|
||||
})
|
||||
}
|
||||
};
|
||||
|
||||
Ok(img)
|
||||
|
||||
@@ -40,8 +40,30 @@ use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use tracing::*;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::sync::gate::Gate;
|
||||
use utils::sync::heavier_once_cell;
|
||||
|
||||
pub struct GlobalState {
|
||||
conf: &'static PageServerConf,
|
||||
pub(self) spawn_gate: Gate,
|
||||
}
|
||||
|
||||
impl GlobalState {
|
||||
pub async fn spawn(conf: &'static PageServerConf) -> Arc<GlobalState> {
|
||||
let state = Arc::new(GlobalState {
|
||||
conf,
|
||||
spawn_gate: Gate::default(),
|
||||
});
|
||||
state
|
||||
}
|
||||
pub(crate) async fn shutdown(self: &Arc<Self>) {
|
||||
self.spawn_gate.close().await
|
||||
// The destructor of WalRedoProcess SIGKILLs and `wait()`s for the process
|
||||
// The gate guard is stored in WalRedoProcess.
|
||||
// So, we arrive here once all WalRedoProcess structs are gone.
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// This is the real implementation that uses a Postgres process to
|
||||
/// perform WAL replay. Only one thread can use the process at a time,
|
||||
@@ -50,8 +72,8 @@ use utils::sync::heavier_once_cell;
|
||||
/// records.
|
||||
///
|
||||
pub struct PostgresRedoManager {
|
||||
global_state: Arc<GlobalState>,
|
||||
tenant_shard_id: TenantShardId,
|
||||
conf: &'static PageServerConf,
|
||||
last_redo_at: std::sync::Mutex<Option<Instant>>,
|
||||
/// The current [`process::WalRedoProcess`] that is used by new redo requests.
|
||||
/// We use [`heavier_once_cell`] for coalescing the spawning, but the redo
|
||||
@@ -65,9 +87,30 @@ pub struct PostgresRedoManager {
|
||||
/// still be using the old redo process. But, those other tasks will most likely
|
||||
/// encounter an error as well, and errors are an unexpected condition anyway.
|
||||
/// So, probably we could get rid of the `Arc` in the future.
|
||||
redo_process: heavier_once_cell::OnceCell<Arc<process::WalRedoProcess>>,
|
||||
redo_process: heavier_once_cell::OnceCell<RedoProcessState>,
|
||||
launch_process_gate: Gate,
|
||||
}
|
||||
|
||||
enum RedoProcessState {
|
||||
Launched(Arc<process::WalRedoProcess>),
|
||||
ManagerShutDown,
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum Error {
|
||||
#[error("cancelled")]
|
||||
Cancelled,
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
macro_rules! bail {
|
||||
($($arg:tt)*) => {
|
||||
return Err($crate::walredo::Error::Other(::anyhow::anyhow!($($arg)*)));
|
||||
}
|
||||
}
|
||||
pub(self) use bail;
|
||||
|
||||
///
|
||||
/// Public interface of WAL redo manager
|
||||
///
|
||||
@@ -88,9 +131,9 @@ impl PostgresRedoManager {
|
||||
base_img: Option<(Lsn, Bytes)>,
|
||||
records: Vec<(Lsn, NeonWalRecord)>,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<Bytes> {
|
||||
) -> Result<Bytes, Error> {
|
||||
if records.is_empty() {
|
||||
anyhow::bail!("invalid WAL redo request with no records");
|
||||
bail!("invalid WAL redo request with no records");
|
||||
}
|
||||
|
||||
let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID);
|
||||
@@ -110,7 +153,7 @@ impl PostgresRedoManager {
|
||||
img,
|
||||
base_img_lsn,
|
||||
&records[batch_start..i],
|
||||
self.conf.wal_redo_timeout,
|
||||
self.global_state.conf.wal_redo_timeout,
|
||||
pg_version,
|
||||
)
|
||||
.await
|
||||
@@ -131,7 +174,7 @@ impl PostgresRedoManager {
|
||||
img,
|
||||
base_img_lsn,
|
||||
&records[batch_start..],
|
||||
self.conf.wal_redo_timeout,
|
||||
self.global_state.conf.wal_redo_timeout,
|
||||
pg_version,
|
||||
)
|
||||
.await
|
||||
@@ -148,10 +191,10 @@ impl PostgresRedoManager {
|
||||
chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?)
|
||||
})
|
||||
},
|
||||
process: self
|
||||
.redo_process
|
||||
.get()
|
||||
.map(|p| WalRedoManagerProcessStatus { pid: p.id() }),
|
||||
process: self.redo_process.get().and_then(|p| match &*p {
|
||||
RedoProcessState::Launched(p) => Some(WalRedoManagerProcessStatus { pid: p.id() }),
|
||||
RedoProcessState::ManagerShutDown => None,
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -161,18 +204,36 @@ impl PostgresRedoManager {
|
||||
/// Create a new PostgresRedoManager.
|
||||
///
|
||||
pub fn new(
|
||||
conf: &'static PageServerConf,
|
||||
global_state: Arc<GlobalState>,
|
||||
tenant_shard_id: TenantShardId,
|
||||
) -> PostgresRedoManager {
|
||||
// The actual process is launched lazily, on first request.
|
||||
PostgresRedoManager {
|
||||
global_state,
|
||||
tenant_shard_id,
|
||||
conf,
|
||||
last_redo_at: std::sync::Mutex::default(),
|
||||
redo_process: heavier_once_cell::OnceCell::default(),
|
||||
launch_process_gate: Gate::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn shutdown(&self) {
|
||||
// prevent new launches
|
||||
let permit = match self.redo_process.get_or_init_detached().await {
|
||||
Ok(guard) => {
|
||||
let (proc, permit) = guard.take_and_deinit();
|
||||
drop(proc); // this just drops the Arc, its refcount may not be zero yet
|
||||
permit
|
||||
}
|
||||
Err(permit) => permit,
|
||||
};
|
||||
self.redo_process
|
||||
.set(RedoProcessState::ManagerShutDown, permit);
|
||||
|
||||
// wait for all WalRedoProcess objects to get dropped
|
||||
self.launch_process_gate.close().await;
|
||||
}
|
||||
|
||||
/// This type doesn't have its own background task to check for idleness: we
|
||||
/// rely on our owner calling this function periodically in its own housekeeping
|
||||
/// loops.
|
||||
@@ -203,7 +264,7 @@ impl PostgresRedoManager {
|
||||
records: &[(Lsn, NeonWalRecord)],
|
||||
wal_redo_timeout: Duration,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<Bytes> {
|
||||
) -> Result<Bytes, Error> {
|
||||
*(self.last_redo_at.lock().unwrap()) = Some(Instant::now());
|
||||
|
||||
let (rel, blknum) = key.to_rel_block().context("invalid record")?;
|
||||
@@ -212,17 +273,27 @@ impl PostgresRedoManager {
|
||||
loop {
|
||||
let proc: Arc<process::WalRedoProcess> =
|
||||
match self.redo_process.get_or_init_detached().await {
|
||||
Ok(guard) => Arc::clone(&guard),
|
||||
Ok(guard) => match &*guard {
|
||||
RedoProcessState::Launched(proc) => Arc::clone(proc),
|
||||
RedoProcessState::ManagerShutDown => {
|
||||
return Err(Error::Cancelled);
|
||||
}
|
||||
},
|
||||
Err(permit) => {
|
||||
// don't hold poison_guard, the launch code can bail
|
||||
let start = Instant::now();
|
||||
let proc = Arc::new(
|
||||
process::WalRedoProcess::launch(
|
||||
self.conf,
|
||||
&self.global_state,
|
||||
self.tenant_shard_id,
|
||||
pg_version,
|
||||
)
|
||||
.context("launch walredo process")?,
|
||||
.map_err(|e| match e {
|
||||
process::LaunchError::Cancelled => Error::Cancelled,
|
||||
process::LaunchError::Other(e) => {
|
||||
Error::Other(e.context("launch walredo process"))
|
||||
}
|
||||
})?,
|
||||
);
|
||||
let duration = start.elapsed();
|
||||
WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM.observe(duration.as_secs_f64());
|
||||
@@ -231,7 +302,8 @@ impl PostgresRedoManager {
|
||||
pid = proc.id(),
|
||||
"launched walredo process"
|
||||
);
|
||||
self.redo_process.set(Arc::clone(&proc), permit);
|
||||
self.redo_process
|
||||
.set(RedoProcessState::Launched(Arc::clone(&proc)), permit);
|
||||
proc
|
||||
}
|
||||
};
|
||||
@@ -242,7 +314,14 @@ impl PostgresRedoManager {
|
||||
let result = proc
|
||||
.apply_wal_records(rel, blknum, &base_img, records, wal_redo_timeout)
|
||||
.await
|
||||
.context("apply_wal_records");
|
||||
.map_err(|e| match e {
|
||||
Error::Cancelled => Error::Cancelled,
|
||||
Error::Other(e) => Error::Other(e.context("apply_wal_records")),
|
||||
});
|
||||
if matches!(result, Err(Error::Cancelled)) {
|
||||
// bail asap and also avoid log noise due to the error reporting below
|
||||
return Err(Error::Cancelled);
|
||||
}
|
||||
|
||||
let duration = started_at.elapsed();
|
||||
|
||||
@@ -299,12 +378,17 @@ impl PostgresRedoManager {
|
||||
match self.redo_process.get() {
|
||||
None => (),
|
||||
Some(guard) => {
|
||||
if Arc::ptr_eq(&proc, &*guard) {
|
||||
// We're the first to observe an error from `proc`, it's our job to take it out of rotation.
|
||||
guard.take_and_deinit();
|
||||
} else {
|
||||
// Another task already spawned another redo process (further up in this method)
|
||||
// and put it into `redo_process`. Do nothing, our view of the world is behind.
|
||||
match &*guard {
|
||||
RedoProcessState::ManagerShutDown => {}
|
||||
RedoProcessState::Launched(guard_proc) => {
|
||||
if Arc::ptr_eq(&proc, &*guard_proc) {
|
||||
// We're the first to observe an error from `proc`, it's our job to take it out of rotation.
|
||||
guard.take_and_deinit();
|
||||
} else {
|
||||
// Another task already spawned another redo process (further up in this method)
|
||||
// and put it into `redo_process`. Do nothing, our view of the world is behind.
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -329,7 +413,7 @@ impl PostgresRedoManager {
|
||||
lsn: Lsn,
|
||||
base_img: Option<Bytes>,
|
||||
records: &[(Lsn, NeonWalRecord)],
|
||||
) -> anyhow::Result<Bytes> {
|
||||
) -> Result<Bytes, Error> {
|
||||
let start_time = Instant::now();
|
||||
|
||||
let mut page = BytesMut::new();
|
||||
@@ -338,7 +422,7 @@ impl PostgresRedoManager {
|
||||
page.extend_from_slice(&fpi[..]);
|
||||
} else {
|
||||
// All the current WAL record types that we can handle require a base image.
|
||||
anyhow::bail!("invalid neon WAL redo request with no base image");
|
||||
bail!("invalid neon WAL redo request with no base image");
|
||||
}
|
||||
|
||||
// Apply all the WAL records in the batch
|
||||
|
||||
@@ -18,13 +18,21 @@ use std::sync::atomic::AtomicUsize;
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
process::{Command, Stdio},
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tracing::{debug, error, instrument, Instrument};
|
||||
use utils::{lsn::Lsn, poison::Poison};
|
||||
use utils::{
|
||||
lsn::Lsn,
|
||||
poison::Poison,
|
||||
sync::gate::{GateError, GateGuard},
|
||||
};
|
||||
|
||||
use super::GlobalState;
|
||||
|
||||
pub struct WalRedoProcess {
|
||||
_spawn_gate_guard: GateGuard,
|
||||
#[allow(dead_code)]
|
||||
conf: &'static PageServerConf,
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -49,18 +57,33 @@ struct ProcessOutput {
|
||||
n_processed_responses: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(super) enum LaunchError {
|
||||
#[error("cancelled")]
|
||||
Cancelled,
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
impl WalRedoProcess {
|
||||
//
|
||||
// Start postgres binary in special WAL redo mode.
|
||||
//
|
||||
#[instrument(skip_all,fields(pg_version=pg_version))]
|
||||
pub(crate) fn launch(
|
||||
conf: &'static PageServerConf,
|
||||
global_state: &Arc<GlobalState>,
|
||||
tenant_shard_id: TenantShardId,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<Self> {
|
||||
) -> Result<Self, LaunchError> {
|
||||
crate::span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let conf = global_state.conf;
|
||||
|
||||
let spawn_gate_guard = match global_state.spawn_gate.enter() {
|
||||
Ok(guard) => guard,
|
||||
Err(GateError::GateClosed) => return Err(LaunchError::Cancelled),
|
||||
};
|
||||
|
||||
let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible.
|
||||
let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?;
|
||||
|
||||
@@ -144,6 +167,7 @@ impl WalRedoProcess {
|
||||
);
|
||||
|
||||
Ok(Self {
|
||||
_spawn_gate_guard: spawn_gate_guard,
|
||||
conf,
|
||||
#[cfg(feature = "testing")]
|
||||
tenant_shard_id,
|
||||
@@ -189,7 +213,7 @@ impl WalRedoProcess {
|
||||
base_img: &Option<Bytes>,
|
||||
records: &[(Lsn, NeonWalRecord)],
|
||||
wal_redo_timeout: Duration,
|
||||
) -> anyhow::Result<Bytes> {
|
||||
) -> Result<Bytes, super::Error> {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let tag = protocol::BufferTag { rel, blknum };
|
||||
@@ -216,17 +240,19 @@ impl WalRedoProcess {
|
||||
{
|
||||
protocol::build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
|
||||
} else {
|
||||
anyhow::bail!("tried to pass neon wal record to postgres WAL redo");
|
||||
super::bail!("tried to pass neon wal record to postgres WAL redo");
|
||||
}
|
||||
}
|
||||
protocol::build_get_page_msg(tag, &mut writebuf);
|
||||
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
|
||||
|
||||
let Ok(res) =
|
||||
tokio::time::timeout(wal_redo_timeout, self.apply_wal_records0(&writebuf)).await
|
||||
else {
|
||||
anyhow::bail!("WAL redo timed out");
|
||||
};
|
||||
let res =
|
||||
// TODO: we should tokio::select! on the self.global_state.shutdown here,
|
||||
// but, that requires thinking through the perf implications
|
||||
tokio::time::timeout(wal_redo_timeout, self.apply_wal_records0(&writebuf))
|
||||
.await
|
||||
.map_err(|_elapsed| anyhow::anyhow!("WAL redo timed out"))?
|
||||
.map_err(super::Error::Other);
|
||||
|
||||
if res.is_err() {
|
||||
// not all of these can be caused by this particular input, however these are so rare
|
||||
@@ -377,6 +403,11 @@ impl Drop for WalRedoProcess {
|
||||
.take()
|
||||
.expect("we only do this once")
|
||||
.kill_and_wait(WalRedoKillCause::WalRedoProcessDrop);
|
||||
|
||||
// The spawn gate is supposed to track running walredo processes.
|
||||
// => must keep guard alive until the process is dead.
|
||||
let _ = &self._spawn_gate_guard;
|
||||
|
||||
// no way to wait for stderr_logger_task from Drop because that is async only
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user