Compare commits

..

6 Commits

Author SHA1 Message Date
Joonas Koivunen
767c8bb95f implement without closing the semaphore
not closing the semaphore requires an additional atomic boolean of state
which is used to describe if the permit received is unique or not. this
does not however change the problem of progress for the
losing/descheduled initializer future.

take_and_deinit does become async in this version, because all pending
initializers are waited out before actually taking anything. in
practice, there should be no-one to await for.

commit also adds DEBUG level span, debug and trace level logging.
2024-02-07 08:51:47 +00:00
Joonas Koivunen
f5b0e723cb test: rewrite back from macro_rules 2024-02-07 06:50:43 +00:00
Joonas Koivunen
9d11fcca02 add comforting debug_assert_eq 2024-02-06 18:34:30 +00:00
Joonas Koivunen
013496c42b cleanup 2024-02-06 18:34:30 +00:00
Joonas Koivunen
0d417d7c0f fix without additional arc clones 2024-02-06 18:34:30 +00:00
Joonas Koivunen
fb5420a959 test: reproduce the race 2024-02-06 18:34:30 +00:00
18 changed files with 458 additions and 504 deletions

View File

@@ -132,7 +132,7 @@ jobs:
check-codestyle-rust:
needs: [ check-permissions, build-buildtools-image ]
runs-on: [ self-hosted, gen3, small ]
runs-on: [ self-hosted, gen3, large ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:${{ needs.build-buildtools-image.outputs.build-tools-tag }}
options: --init

16
Cargo.lock generated
View File

@@ -196,19 +196,6 @@ dependencies = [
"futures-core",
]
[[package]]
name = "async-channel"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ca33f4bc4ed1babef42cad36cc1f51fa88be00420404e5b1e80ab1b18f7678c"
dependencies = [
"concurrent-queue",
"event-listener 4.0.0",
"event-listener-strategy",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-compression"
version = "0.4.5"
@@ -2390,7 +2377,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e9b187a72d63adbfba487f48095306ac823049cb504ee195541e91c7775f5ad"
dependencies = [
"anyhow",
"async-channel 1.9.0",
"async-channel",
"base64 0.13.1",
"futures-lite",
"infer",
@@ -6294,7 +6281,6 @@ version = "0.1.0"
dependencies = [
"anyhow",
"arc-swap",
"async-channel 2.1.1",
"async-trait",
"bincode",
"byteorder",

View File

@@ -40,7 +40,6 @@ license = "Apache-2.0"
[workspace.dependencies]
anyhow = { version = "1.0", features = ["backtrace"] }
arc-swap = "1.6"
async-channel = "2.1.1"
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
azure_core = "0.18"
azure_identity = "0.18"

View File

@@ -12,7 +12,6 @@ testing = ["fail/failpoints"]
[dependencies]
arc-swap.workspace = true
async-channel.workspace = true
sentry.workspace = true
async-trait.workspace = true
anyhow.workspace = true

View File

@@ -87,8 +87,6 @@ pub mod failpoint_support;
pub mod yielding_loop;
pub mod pre_spawned_pool;
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
///
/// we have several cases:

View File

@@ -1,159 +0,0 @@
use std::sync::Arc;
use tokio::sync::{mpsc, OwnedSemaphorePermit};
use tracing::{debug, instrument};
use crate::backoff;
pub struct Client<T> {
cmds_tx: mpsc::UnboundedSender<Command>,
items_rx: async_channel::Receiver<CreatedItem<T>>,
}
pub trait Launcher<T> {
fn what() -> &'static str;
fn create(&self) -> anyhow::Result<T>;
}
#[derive(Debug)]
enum Command {
SetSlotCount(usize),
}
#[derive(thiserror::Error, Debug)]
pub enum GetError {
#[error("shutting down")]
ShuttingDown,
}
impl<T> Client<T> {
pub async fn get(&self) -> Result<T, GetError> {
self.items_rx
.recv()
.await
.map_err(|_| GetError::ShuttingDown)
.map(|CreatedItem { permit, item }| {
drop(permit); // allow a new one to be pre-spanwed
item
})
}
pub fn set_slot_count_nowait(&self, count: usize) {
self.cmds_tx
.send(Command::SetSlotCount(count))
.expect("while cmds_tx is open, the pool task doesn't exit");
}
}
pub struct Pool<T, L>
where
T: Send + 'static,
L: Send + Launcher<T> + 'static,
{
launcher: L,
cmds_rx: mpsc::UnboundedReceiver<Command>,
items_tx: async_channel::Sender<CreatedItem<T>>,
}
struct CreatedItem<T> {
permit: OwnedSemaphorePermit,
item: T,
}
impl<T, L> Pool<T, L>
where
T: Send + 'static,
L: Send + Launcher<T> + 'static,
{
pub async fn launch(launcher: L) -> Client<T> {
let (cmds_tx, cmds_rx) = mpsc::unbounded_channel(); // callers are limited to mgmt api
let (items_tx, items_rx) = async_channel::unbounded(); // task() limits pending items itself
// task gets cancelled by dropping the last Client
tokio::spawn(
Self {
launcher,
cmds_rx,
items_tx,
}
.task(),
);
Client { cmds_tx, items_rx }
}
#[instrument(skip_all)]
async fn task(mut self) {
let initial = 0;
let mut configured = initial;
let pending_items = Arc::new(tokio::sync::Semaphore::new(initial));
let mut need_forget = 0;
let mut last_launcher_failure_at = None;
loop {
debug!(
configured,
need_forget,
available = pending_items.available_permits(),
last_launcher_failure_secs_ago =
last_launcher_failure_at.map(|at| at.elapsed().as_secs_f64()),
"iteration"
);
let try_launch_once = || async {
let permit = Arc::clone(&pending_items)
.acquire_owned()
.expect("we never close this semaphore");
if need_forget > 0 {
debug!("fogetting permit to reduce semaphore count");
need_forget -= 1;
permit.forget();
continue;
}
debug!("creating item");
let item = match self.launcher.create() {
Ok(item) => item,
Err(e) => {
error!(
"launcher failed to create item: {}",
report_compact_sources(&e)
);
}
};
};
let try_launch_retrying = backoff::retry(
try_launch_once,
|_| false,
0,
u32::MAX,
L::what(),
CancellationToken::new(),
);
let cmd = tokio::select! {
res = self.cmds_rx.recv() => {
match res {
Some(cmd) => cmd,
None => return, // dropping tx acts as cancellation
}
}
item = try_launch => {
match self.items_tx.send(CreatedItem { permit, item }).await {
Ok(()) => continue,
Err(_) => {
debug!("stopping, client has gone away");
return;
}
}
}
};
debug!(?cmd, "received command");
match cmd {
Command::SetSlotCount(desired) => {
if desired > configured {
pending_items.add_permits(desired - configured);
} else if desired < configured {
need_forget += configured - desired;
}
configured = desired;
}
}
}
}
}

View File

@@ -1,5 +1,5 @@
use std::sync::{
atomic::{AtomicUsize, Ordering},
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
};
use tokio::sync::Semaphore;
@@ -14,6 +14,18 @@ use tokio::sync::Semaphore;
pub struct OnceCell<T> {
inner: tokio::sync::RwLock<Inner<T>>,
initializers: AtomicUsize,
/// Do we have one permit or `u32::MAX` permits?
///
/// Having one permit means the cell is not initialized, and one winning future could
/// initialize it. The act of initializing the cell adds `u32::MAX` permits and set this to
/// `false`.
///
/// Deinitializing an initialized cell will first take `u32::MAX` permits handing one of them
/// out, then set this back to `true`.
///
/// Because we need to see all changes to this variable, always use Acquire to read, AcqRel to
/// compare_exchange.
has_one_permit: AtomicBool,
}
impl<T> Default for OnceCell<T> {
@@ -22,6 +34,7 @@ impl<T> Default for OnceCell<T> {
Self {
inner: Default::default(),
initializers: AtomicUsize::new(0),
has_one_permit: AtomicBool::new(true),
}
}
}
@@ -47,14 +60,14 @@ impl<T> Default for Inner<T> {
impl<T> OnceCell<T> {
/// Creates an already initialized `OnceCell` with the given value.
pub fn new(value: T) -> Self {
let sem = Semaphore::new(1);
sem.close();
let sem = Semaphore::new(u32::MAX as usize);
Self {
inner: tokio::sync::RwLock::new(Inner {
init_semaphore: Arc::new(sem),
value: Some(value),
}),
initializers: AtomicUsize::new(0),
has_one_permit: AtomicBool::new(false),
}
}
@@ -64,42 +77,47 @@ impl<T> OnceCell<T> {
/// Initializing might wait on any existing [`GuardMut::take_and_deinit`] deinitialization.
///
/// Initialization is panic-safe and cancellation-safe.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub async fn get_mut_or_init<F, Fut, E>(&self, factory: F) -> Result<GuardMut<'_, T>, E>
where
F: FnOnce(InitPermit) -> Fut,
Fut: std::future::Future<Output = Result<(T, InitPermit), E>>,
{
let sem = {
loop {
let sem = {
let guard = self.inner.write().await;
if guard.value.is_some() {
tracing::debug!("returning GuardMut over existing value");
return Ok(GuardMut(guard));
}
guard.init_semaphore.clone()
};
{
let permit = {
let _guard = CountWaitingInitializers::start(self);
sem.acquire().await
};
let permit = permit.expect("semaphore is never closed");
if !self.has_one_permit.load(Ordering::Acquire) {
// it is important that the permit is dropped here otherwise there would be a
// deadlock with `take_and_deinit` happening at the same time.
tracing::trace!("seems initialization happened already, trying again");
continue;
}
permit.forget();
}
tracing::trace!("calling factory");
let permit = InitPermit::from(sem);
let (value, permit) = factory(permit).await?;
let guard = self.inner.write().await;
if guard.value.is_some() {
return Ok(GuardMut(guard));
}
guard.init_semaphore.clone()
};
let permit = {
// increment the count for the duration of queued
let _guard = CountWaitingInitializers::start(self);
sem.acquire_owned().await
};
match permit {
Ok(permit) => {
let permit = InitPermit(permit);
let (value, _permit) = factory(permit).await?;
let guard = self.inner.write().await;
Ok(Self::set0(value, guard))
}
Err(_closed) => {
let guard = self.inner.write().await;
assert!(
guard.value.is_some(),
"semaphore got closed, must be initialized"
);
return Ok(GuardMut(guard));
}
return Ok(self.set0(value, guard, permit));
}
}
@@ -107,42 +125,48 @@ impl<T> OnceCell<T> {
/// returning the guard.
///
/// Initialization is panic-safe and cancellation-safe.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub async fn get_or_init<F, Fut, E>(&self, factory: F) -> Result<GuardRef<'_, T>, E>
where
F: FnOnce(InitPermit) -> Fut,
Fut: std::future::Future<Output = Result<(T, InitPermit), E>>,
{
let sem = {
let guard = self.inner.read().await;
if guard.value.is_some() {
return Ok(GuardRef(guard));
}
guard.init_semaphore.clone()
};
let permit = {
// increment the count for the duration of queued
let _guard = CountWaitingInitializers::start(self);
sem.acquire_owned().await
};
match permit {
Ok(permit) => {
let permit = InitPermit(permit);
let (value, _permit) = factory(permit).await?;
let guard = self.inner.write().await;
Ok(Self::set0(value, guard).downgrade())
}
Err(_closed) => {
loop {
let sem = {
let guard = self.inner.read().await;
assert!(
guard.value.is_some(),
"semaphore got closed, must be initialized"
);
return Ok(GuardRef(guard));
if guard.value.is_some() {
tracing::debug!("returning GuardRef over existing value");
return Ok(GuardRef(guard));
}
guard.init_semaphore.clone()
};
{
let permit = {
// increment the count for the duration of queued
let _guard = CountWaitingInitializers::start(self);
sem.acquire().await
};
let permit = permit.expect("semaphore is never closed");
if !self.has_one_permit.load(Ordering::Acquire) {
tracing::trace!("seems initialization happened already, trying again");
continue;
} else {
// it is our turn to initialize for sure
}
permit.forget();
}
tracing::trace!("calling factory");
let permit = InitPermit::from(sem);
let (value, permit) = factory(permit).await?;
let guard = self.inner.write().await;
return Ok(self.set0(value, guard, permit).downgrade());
}
}
@@ -152,26 +176,47 @@ impl<T> OnceCell<T> {
/// # Panics
///
/// If the inner has already been initialized.
pub async fn set(&self, value: T, _permit: InitPermit) -> GuardMut<'_, T> {
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub async fn set(&self, value: T, permit: InitPermit) -> GuardMut<'_, T> {
let guard = self.inner.write().await;
assert!(
self.has_one_permit.load(Ordering::Acquire),
"cannot set when there are multiple permits"
);
// cannot assert that this permit is for self.inner.semaphore, but we can assert it cannot
// give more permits right now.
if guard.init_semaphore.try_acquire().is_ok() {
let available = guard.init_semaphore.available_permits();
drop(guard);
panic!("permit is of wrong origin");
panic!("permit is of wrong origin: {available}");
}
Self::set0(value, guard)
self.set0(value, guard, permit)
}
fn set0(value: T, mut guard: tokio::sync::RwLockWriteGuard<'_, Inner<T>>) -> GuardMut<'_, T> {
fn set0<'a>(
&'a self,
value: T,
mut guard: tokio::sync::RwLockWriteGuard<'a, Inner<T>>,
permit: InitPermit,
) -> GuardMut<'a, T> {
if guard.value.is_some() {
drop(guard);
unreachable!("we won permit, must not be initialized");
}
guard.value = Some(value);
guard.init_semaphore.close();
assert!(
self.has_one_permit
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
.is_ok(),
"should had only had one permit"
);
permit.forget();
guard.init_semaphore.add_permits(u32::MAX as usize);
tracing::debug!("value initialized");
GuardMut(guard)
}
@@ -199,6 +244,48 @@ impl<T> OnceCell<T> {
pub fn initializer_count(&self) -> usize {
self.initializers.load(Ordering::Relaxed)
}
/// Take the current value, and a new permit for it's deinitialization.
///
/// The permit will be on a semaphore part of the new internal value, and any following
/// [`OnceCell::get_or_init`] will wait on it to complete.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub async fn take_and_deinit(&self, mut guard: GuardMut<'_, T>) -> (T, InitPermit) {
// guard exists => we have been initialized
assert!(
!self.has_one_permit.load(Ordering::Acquire),
"has to have all permits after initializing"
);
assert!(guard.0.value.is_some(), "guard exists => initialized");
// we must first drain out all "waiting to initialize" stragglers
tracing::trace!("draining other initializers");
let all_permits = guard
.0
.init_semaphore
.acquire_many(u32::MAX)
.await
.expect("never closed");
all_permits.forget();
tracing::debug!("other initializers drained");
assert_eq!(guard.0.init_semaphore.available_permits(), 0);
// now that the permits have been drained, switch the state
assert!(
self.has_one_permit
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok(),
"there should be only one GuardMut attempting take_and_deinit"
);
let value = guard.0.value.take().unwrap();
// act of creating an init_permit is the same as "adding back one when this is dropped"
let init_permit = InitPermit::from(guard.0.init_semaphore.clone());
(value, init_permit)
}
}
/// DropGuard counter for queued tasks waiting to initialize, mainly accessible for the
@@ -244,24 +331,6 @@ impl<T> std::ops::DerefMut for GuardMut<'_, T> {
}
impl<'a, T> GuardMut<'a, T> {
/// Take the current value, and a new permit for it's deinitialization.
///
/// The permit will be on a semaphore part of the new internal value, and any following
/// [`OnceCell::get_or_init`] will wait on it to complete.
pub fn take_and_deinit(&mut self) -> (T, InitPermit) {
let mut swapped = Inner::default();
let permit = swapped
.init_semaphore
.clone()
.try_acquire_owned()
.expect("we just created this");
std::mem::swap(&mut *self.0, &mut swapped);
swapped
.value
.map(|v| (v, InitPermit(permit)))
.expect("guard is not created unless value has been initialized")
}
pub fn downgrade(self) -> GuardRef<'a, T> {
GuardRef(self.0.downgrade())
}
@@ -282,13 +351,39 @@ impl<T> std::ops::Deref for GuardRef<'_, T> {
}
/// Type held by OnceCell (de)initializing task.
pub struct InitPermit(tokio::sync::OwnedSemaphorePermit);
pub struct InitPermit(Option<Arc<tokio::sync::Semaphore>>);
impl From<Arc<tokio::sync::Semaphore>> for InitPermit {
fn from(value: Arc<tokio::sync::Semaphore>) -> Self {
InitPermit(Some(value))
}
}
impl InitPermit {
fn forget(mut self) {
self.0
.take()
.expect("unable to forget twice, created with None?");
}
}
impl Drop for InitPermit {
fn drop(&mut self) {
if let Some(sem) = self.0.take() {
debug_assert_eq!(sem.available_permits(), 0);
sem.add_permits(1);
}
}
}
#[cfg(test)]
mod tests {
use futures::Future;
use super::*;
use std::{
convert::Infallible,
pin::{pin, Pin},
sync::atomic::{AtomicUsize, Ordering},
time::Duration,
};
@@ -366,11 +461,8 @@ mod tests {
let cell = cell.clone();
let deinitialization_started = deinitialization_started.clone();
async move {
let (answer, _permit) = cell
.get_mut()
.await
.expect("initialized to value")
.take_and_deinit();
let guard = cell.get_mut().await.unwrap();
let (answer, _permit) = cell.take_and_deinit(guard).await;
assert_eq!(answer, initial);
deinitialization_started.wait().await;
@@ -399,12 +491,31 @@ mod tests {
#[tokio::test]
async fn reinit_with_deinit_permit() {
let cell = Arc::new(OnceCell::new(42));
assert!(!cell.has_one_permit.load(Ordering::Acquire));
assert_eq!(
cell.inner.read().await.init_semaphore.available_permits(),
u32::MAX as usize
);
let guard = cell.get_mut().await.unwrap();
assert!(!cell.has_one_permit.load(Ordering::Acquire));
assert_eq!(
guard.0.init_semaphore.available_permits(),
u32::MAX as usize
);
let (mol, permit) = cell.take_and_deinit(guard).await;
assert!(cell.has_one_permit.load(Ordering::Acquire));
assert_eq!(
cell.inner.read().await.init_semaphore.available_permits(),
0
);
let (mol, permit) = cell.get_mut().await.unwrap().take_and_deinit();
cell.set(5, permit).await;
assert_eq!(*cell.get_mut().await.unwrap(), 5);
let (five, permit) = cell.get_mut().await.unwrap().take_and_deinit();
let guard = cell.get_mut().await.unwrap();
let (five, permit) = cell.take_and_deinit(guard).await;
assert_eq!(5, five);
cell.set(mol, permit).await;
assert_eq!(*cell.get_mut().await.unwrap(), 42);
@@ -455,4 +566,110 @@ mod tests {
.unwrap();
assert_eq!(*g, "now initialized");
}
#[tokio::test(start_paused = true)]
async fn reproduce_init_take_deinit_race_ref() {
init_take_deinit_scenario(|cell, factory| {
Box::pin(async {
cell.get_or_init(factory).await.unwrap();
})
})
.await;
}
#[tokio::test(start_paused = true)]
async fn reproduce_init_take_deinit_race_mut() {
init_take_deinit_scenario(|cell, factory| {
Box::pin(async {
cell.get_mut_or_init(factory).await.unwrap();
})
})
.await;
}
type BoxedInitFuture<T, E> = Pin<Box<dyn Future<Output = Result<(T, InitPermit), E>>>>;
type BoxedInitFunction<T, E> = Box<dyn Fn(InitPermit) -> BoxedInitFuture<T, E>>;
/// Reproduce an assertion failure with both initialization methods.
///
/// This has interesting generics to be generic between `get_or_init` and `get_mut_or_init`.
/// Alternative would be a macro_rules! but that is the last resort.
async fn init_take_deinit_scenario<F>(init_way: F)
where
F: for<'a> Fn(
&'a OnceCell<&'static str>,
BoxedInitFunction<&'static str, Infallible>,
) -> Pin<Box<dyn Future<Output = ()> + 'a>>,
{
use tracing::Instrument;
let cell = OnceCell::default();
// acquire the init_semaphore only permit to drive initializing tasks in order to waiting
// on the same semaphore.
let permit = cell
.inner
.read()
.await
.init_semaphore
.clone()
.try_acquire_owned()
.unwrap();
let mut t1 = pin!(init_way(
&cell,
Box::new(|permit| Box::pin(async move { Ok(("t1", permit)) })),
)
.instrument(tracing::info_span!("t1")));
let mut t2 = pin!(init_way(
&cell,
Box::new(|permit| Box::pin(async move { Ok(("t2", permit)) })),
)
.instrument(tracing::info_span!("t2")));
// drive t2 first to the init_semaphore
tokio::select! {
_ = &mut t2 => unreachable!("it cannot get permit"),
_ = tokio::time::sleep(Duration::from_secs(3600 * 24 * 7 * 365)) => {}
}
// followed by t1 in the init_semaphore
tokio::select! {
_ = &mut t1 => unreachable!("it cannot get permit"),
_ = tokio::time::sleep(Duration::from_secs(3600 * 24 * 7 * 365)) => {}
}
// now let t2 proceed and initialize
drop(permit);
t2.await;
// in original implementation which did closing and re-creation of the semaphore, t1 was
// still stuck on the first semaphore, but now that t1 and deinit are using the same
// semaphore, deinit will have to wait for t1.
let mut deinit = pin!(async {
let guard = cell.get_mut().await.unwrap();
cell.take_and_deinit(guard).await
}
.instrument(tracing::info_span!("deinit")));
tokio::select! {
_ = &mut deinit => unreachable!("deinit must not make progress before t1 is complete"),
_ = tokio::time::sleep(Duration::from_secs(3600 * 24 * 7 * 365)) => {}
}
// now originally t1 would see the semaphore it has as closed. it cannot yet get a permit from
// the new one.
tokio::select! {
_ = &mut t1 => unreachable!("it cannot get permit"),
_ = tokio::time::sleep(Duration::from_secs(3600 * 24 * 7 * 365)) => {}
}
let (s, _) = deinit.await;
assert_eq!("t2", s);
t1.await;
assert_eq!("t1", *cell.get().await.unwrap());
}
}

View File

@@ -309,9 +309,6 @@ fn start_pageserver(
info!("Starting pageserver pg protocol handler on {pg_addr}");
let pageserver_listener = tcp_listener::bind(pg_addr)?;
let walredo_process_pool =
Arc::new(COMPUTE_REQUEST_RUNTIME.block_on(pageserver::walredo::ProcessPool::launch(conf)));
// Launch broker client
// The storage_broker::connect call needs to happen inside a tokio runtime thread.
let broker_client = WALRECEIVER_RUNTIME
@@ -417,7 +414,6 @@ fn start_pageserver(
let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
conf,
TenantSharedResources {
walredo_process_pool: walredo_process_pool.clone(),
broker_client: broker_client.clone(),
remote_storage: remote_storage.clone(),
deletion_queue_client,
@@ -549,7 +545,6 @@ fn start_pageserver(
disk_usage_eviction_state,
deletion_queue.new_client(),
secondary_controller,
walredo_process_pool,
)
.context("Failed to initialize router state")?,
);

View File

@@ -55,9 +55,7 @@ use crate::tenant::storage_layer::LayerAccessStatsReset;
use crate::tenant::timeline::CompactFlags;
use crate::tenant::timeline::Timeline;
use crate::tenant::SpawnMode;
use crate::tenant::TenantSharedResources;
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
use crate::walredo;
use crate::{config::PageServerConf, tenant::mgr};
use crate::{disk_usage_eviction_task, tenant};
use pageserver_api::models::{
@@ -99,7 +97,6 @@ pub struct State {
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
secondary_controller: SecondaryController,
walredo_process_pool: Arc<walredo::ProcessPool>,
}
impl State {
@@ -113,7 +110,6 @@ impl State {
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
secondary_controller: SecondaryController,
walredo_process_pool: Arc<walredo::ProcessPool>,
) -> anyhow::Result<Self> {
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml", "/metrics"]
.iter()
@@ -129,18 +125,8 @@ impl State {
disk_usage_eviction_state,
deletion_queue_client,
secondary_controller,
walredo_process_pool,
})
}
pub(crate) fn tenant_shared_resources(&self) -> TenantSharedResources {
TenantSharedResources {
walredo_process_pool: Arc::clone(&self.walredo_process_pool),
broker_client: self.broker_client.clone(),
remote_storage: self.remote_storage.clone(),
deletion_queue_client: self.deletion_queue_client.clone(),
}
}
}
#[inline(always)]
@@ -917,7 +903,9 @@ async fn tenant_load_handler(
state.conf,
tenant_id,
generation,
state.tenant_shared_resources(),
state.broker_client.clone(),
state.remote_storage.clone(),
state.deletion_queue_client.clone(),
&ctx,
)
.instrument(info_span!("load", %tenant_id))
@@ -992,7 +980,7 @@ async fn tenant_status(
attachment_status: state.attachment_status(),
generation: tenant.generation().into(),
},
walredo: tenant.wal_redo_manager_status().await,
walredo: tenant.wal_redo_manager_status(),
timelines: tenant.list_timeline_ids(),
})
}

View File

@@ -1652,7 +1652,7 @@ pub(crate) static WAL_REDO_RECORD_COUNTER: Lazy<IntCounter> = Lazy::new(|| {
});
#[rustfmt::skip]
pub(crate) static WAL_REDO_PROCESS_ACQUISITION_LATENCY_HISTO: Lazy<Histogram> = Lazy::new(|| {
pub(crate) static WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_wal_redo_process_launch_duration",
"Histogram of the duration of successful WalRedoProcess::launch calls",
@@ -2452,7 +2452,7 @@ pub fn preinitialize_metrics() {
&WAL_REDO_TIME,
&WAL_REDO_RECORDS_HISTOGRAM,
&WAL_REDO_BYTES_HISTOGRAM,
&WAL_REDO_PROCESS_ACQUISITION_LATENCY_HISTO,
&WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM,
]
.into_iter()
.for_each(|h| {

View File

@@ -189,7 +189,6 @@ pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted";
/// as the shared remote storage client and process initialization state.
#[derive(Clone)]
pub struct TenantSharedResources {
pub walredo_process_pool: Arc<crate::walredo::ProcessPool>,
pub broker_client: storage_broker::BrokerClientChannel,
pub remote_storage: Option<GenericRemoteStorage>,
pub deletion_queue_client: DeletionQueueClient,
@@ -279,7 +278,7 @@ pub struct Tenant {
// with timelines, which in turn may cause dropping replication connection, expiration of wait_for_lsn
// timeout...
gc_cs: tokio::sync::Mutex<()>,
walredo_mgr: Option<Arc<WalRedoManager>>,
walredo_mgr: Arc<WalRedoManager>,
// provides access to timeline data sitting in the remote storage
pub(crate) remote_storage: Option<GenericRemoteStorage>,
@@ -335,9 +334,9 @@ impl From<harness::TestRedoManager> for WalRedoManager {
}
impl WalRedoManager {
pub(crate) async fn maybe_quiesce(&self, idle_timeout: Duration) {
pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
match self {
Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout).await,
Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout),
#[cfg(test)]
Self::Test(_) => {
// Not applicable to test redo manager
@@ -369,9 +368,9 @@ impl WalRedoManager {
}
}
pub(crate) async fn status(&self) -> Option<WalRedoManagerStatus> {
pub(crate) fn status(&self) -> Option<WalRedoManagerStatus> {
match self {
WalRedoManager::Prod(m) => m.status().await,
WalRedoManager::Prod(m) => m.status(),
#[cfg(test)]
WalRedoManager::Test(_) => None,
}
@@ -617,18 +616,17 @@ impl Tenant {
mode: SpawnMode,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
conf,
tenant_shard_id,
)));
let TenantSharedResources {
walredo_process_pool,
broker_client,
remote_storage,
deletion_queue_client,
} = resources;
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
conf,
tenant_shard_id,
walredo_process_pool,
)));
let attach_mode = attached_conf.location.attach_mode;
let generation = attached_conf.location.generation;
@@ -637,7 +635,7 @@ impl Tenant {
conf,
attached_conf,
shard_identity,
Some(wal_redo_manager),
wal_redo_manager,
tenant_shard_id,
remote_storage.clone(),
deletion_queue_client,
@@ -1197,6 +1195,10 @@ impl Tenant {
tenant_shard_id: TenantShardId,
reason: String,
) -> Arc<Tenant> {
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
conf,
tenant_shard_id,
)));
Arc::new(Tenant::new(
TenantState::Broken {
reason,
@@ -1207,7 +1209,7 @@ impl Tenant {
// Shard identity isn't meaningful for a broken tenant: it's just a placeholder
// to occupy the slot for this TenantShardId.
ShardIdentity::broken(tenant_shard_id.shard_number, tenant_shard_id.shard_count),
None,
wal_redo_manager,
tenant_shard_id,
None,
DeletionQueueClient::broken(),
@@ -1975,11 +1977,8 @@ impl Tenant {
self.generation
}
pub(crate) async fn wal_redo_manager_status(&self) -> Option<WalRedoManagerStatus> {
let Some(mgr) = self.walredo_mgr.as_ref() else {
return None;
};
mgr.status().await
pub(crate) fn wal_redo_manager_status(&self) -> Option<WalRedoManagerStatus> {
self.walredo_mgr.status()
}
/// Changes tenant status to active, unless shutdown was already requested.
@@ -2614,7 +2613,7 @@ impl Tenant {
self.tenant_shard_id,
self.generation,
self.shard_identity,
self.walredo_mgr.as_ref().map(Arc::clone),
Arc::clone(&self.walredo_mgr),
resources,
pg_version,
state,
@@ -2632,7 +2631,7 @@ impl Tenant {
conf: &'static PageServerConf,
attached_conf: AttachedTenantConf,
shard_identity: ShardIdentity,
walredo_mgr: Option<Arc<WalRedoManager>>,
walredo_mgr: Arc<WalRedoManager>,
tenant_shard_id: TenantShardId,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue_client: DeletionQueueClient,
@@ -4056,7 +4055,7 @@ pub(crate) mod harness {
.unwrap(),
// This is a legacy/test code path: sharding isn't supported here.
ShardIdentity::unsharded(),
Some(walredo_mgr),
walredo_mgr,
self.tenant_shard_id,
Some(self.remote_storage.clone()),
self.deletion_queue.new_client(),

View File

@@ -21,6 +21,7 @@ use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::*;
use remote_storage::GenericRemoteStorage;
use utils::crashsafe;
use crate::config::PageServerConf;
@@ -1660,7 +1661,9 @@ pub(crate) async fn load_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
generation: Generation,
resources: TenantSharedResources,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue_client: DeletionQueueClient,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
// This is a legacy API (replaced by `/location_conf`). It does not support sharding
@@ -1679,6 +1682,12 @@ pub(crate) async fn load_tenant(
})?;
}
let resources = TenantSharedResources {
broker_client,
remote_storage,
deletion_queue_client,
};
let mut location_conf =
Tenant::load_tenant_config(conf, &tenant_shard_id).map_err(TenantMapInsertError::Other)?;
location_conf.attach_in_generation(generation);

View File

@@ -199,9 +199,7 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
// Perhaps we did no work and the walredo process has been idle for some time:
// give it a chance to shut down to avoid leaving walredo process running indefinitely.
if let Some(walredo_mgr) = &tenant.walredo_mgr {
walredo_mgr.maybe_quiesce(period * 10).await;
}
tenant.walredo_mgr.maybe_quiesce(period * 10);
// Sleep
if tokio::time::timeout(sleep_duration, cancel.cancelled())

View File

@@ -215,8 +215,8 @@ pub struct Timeline {
// Atomic would be more appropriate here.
last_freeze_ts: RwLock<Instant>,
// WAL redo manager. `None` only for broken tenants.
walredo_mgr: Option<Arc<super::WalRedoManager>>,
// WAL redo manager
walredo_mgr: Arc<super::WalRedoManager>,
/// Remote storage client.
/// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details.
@@ -1427,7 +1427,7 @@ impl Timeline {
tenant_shard_id: TenantShardId,
generation: Generation,
shard_identity: ShardIdentity,
walredo_mgr: Option<Arc<super::WalRedoManager>>,
walredo_mgr: Arc<super::WalRedoManager>,
resources: TimelineResources,
pg_version: u32,
state: TimelineState,
@@ -4457,9 +4457,6 @@ impl Timeline {
let img = match self
.walredo_mgr
.as_ref()
.context("timeline has no walredo manager")
.map_err(PageReconstructError::WalRedo)?
.request_redo(key, request_lsn, data.img, data.records, self.pg_version)
.await
.context("reconstruct a page image")

View File

@@ -21,16 +21,12 @@
/// Process lifecycle and abstracction for the IPC protocol.
mod process;
mod process_pool;
pub use process_pool::Pool as ProcessPool;
use utils::sync::heavier_once_cell;
/// Code to apply [`NeonWalRecord`]s.
mod apply_neon;
use crate::config::PageServerConf;
use crate::metrics::{
WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_PROCESS_ACQUISITION_LATENCY_HISTO,
WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM,
WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_TIME,
};
use crate::repository::Key;
@@ -40,14 +36,12 @@ use bytes::{Bytes, BytesMut};
use pageserver_api::key::key_to_rel_block;
use pageserver_api::models::WalRedoManagerStatus;
use pageserver_api::shard::TenantShardId;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use std::time::Instant;
use tracing::*;
use utils::lsn::Lsn;
use self::process::WalRedoProcess;
///
/// This is the real implementation that uses a Postgres process to
/// perform WAL replay. Only one thread can use the process at a time,
@@ -59,8 +53,7 @@ pub struct PostgresRedoManager {
tenant_shard_id: TenantShardId,
conf: &'static PageServerConf,
last_redo_at: std::sync::Mutex<Option<Instant>>,
redo_process: heavier_once_cell::OnceCell<Arc<TaintedProcess>>,
pool: Arc<ProcessPool>,
redo_process: RwLock<Option<Arc<process::WalRedoProcess>>>,
}
///
@@ -108,7 +101,6 @@ impl PostgresRedoManager {
self.conf.wal_redo_timeout,
pg_version,
)
.await
};
img = Some(result?);
@@ -129,11 +121,10 @@ impl PostgresRedoManager {
self.conf.wal_redo_timeout,
pg_version,
)
.await
}
}
pub(crate) async fn status(&self) -> Option<WalRedoManagerStatus> {
pub(crate) fn status(&self) -> Option<WalRedoManagerStatus> {
Some(WalRedoManagerStatus {
last_redo_at: {
let at = *self.last_redo_at.lock().unwrap();
@@ -143,36 +134,11 @@ impl PostgresRedoManager {
chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?)
})
},
pid: self.redo_process.get_mut().await.map(|p| p.id()),
pid: self.redo_process.read().unwrap().as_ref().map(|p| p.id()),
})
}
}
struct TaintedProcess {
tenant_shard_id: TenantShardId,
process: Option<Box<WalRedoProcess>>,
}
impl std::ops::Deref for TaintedProcess {
type Target = WalRedoProcess;
fn deref(&self) -> &Self::Target {
self.process
.as_ref()
.expect("only Self::drop sets it to None")
}
}
impl Drop for TaintedProcess {
fn drop(&mut self) {
// ensure tenant_id and span_id are in span
let span = info_span!("walredo", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug());
let _entered = span.enter();
let process = self.process.take().expect("we are the only takers");
drop(process);
}
}
impl PostgresRedoManager {
///
/// Create a new PostgresRedoManager.
@@ -180,49 +146,36 @@ impl PostgresRedoManager {
pub fn new(
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
pool: Arc<process_pool::Pool>,
) -> PostgresRedoManager {
// The actual process is launched lazily, on first request.
PostgresRedoManager {
tenant_shard_id,
conf,
last_redo_at: std::sync::Mutex::default(),
redo_process: heavier_once_cell::OnceCell::default(),
pool,
redo_process: RwLock::new(None),
}
}
/// This type doesn't have its own background task to check for idleness: we
/// rely on our owner calling this function periodically in its own housekeeping
/// loops.
pub(crate) async fn maybe_quiesce(&self, idle_timeout: Duration) {
pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
if let Ok(g) = self.last_redo_at.try_lock() {
if let Some(last_redo_at) = *g {
if last_redo_at.elapsed() >= idle_timeout {
drop(g);
// fallthrough
} else {
return;
let mut guard = self.redo_process.write().unwrap();
*guard = None;
}
} else {
return;
}
} else {
return;
}
drop(
self.redo_process
.get_mut()
.await
.map(|mut guard| guard.take_and_deinit()),
);
}
///
/// Process one request for WAL redo using wal-redo postgres
///
#[allow(clippy::too_many_arguments)]
async fn apply_batch_postgres(
fn apply_batch_postgres(
&self,
key: Key,
lsn: Lsn,
@@ -239,29 +192,46 @@ impl PostgresRedoManager {
let mut n_attempts = 0u32;
loop {
// launch the WAL redo process on first use
let proc_once_cell_guard_ref = self
.redo_process
.get_or_init(|init_permit| async move {
let start = Instant::now();
let proc = Arc::new(TaintedProcess {
tenant_shard_id: self.tenant_shard_id,
process: Some(self.pool.get(pg_version).await?),
});
let duration = start.elapsed();
WAL_REDO_PROCESS_ACQUISITION_LATENCY_HISTO.observe(duration.as_secs_f64());
info!(
duration_ms = duration.as_millis(),
pid = proc.id(),
"acquired pre-spawned walredo process"
);
anyhow::Ok((proc, init_permit))
})
.await?;
let proc: Arc<process::WalRedoProcess> = {
let proc_guard = self.redo_process.read().unwrap();
match &*proc_guard {
None => {
// "upgrade" to write lock to launch the process
drop(proc_guard);
let mut proc_guard = self.redo_process.write().unwrap();
match &*proc_guard {
None => {
let start = Instant::now();
let proc = Arc::new(
process::WalRedoProcess::launch(
self.conf,
self.tenant_shard_id,
pg_version,
)
.context("launch walredo process")?,
);
let duration = start.elapsed();
WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM
.observe(duration.as_secs_f64());
info!(
duration_ms = duration.as_millis(),
pid = proc.id(),
"launched walredo process"
);
*proc_guard = Some(Arc::clone(&proc));
proc
}
Some(proc) => Arc::clone(proc),
}
}
Some(proc) => Arc::clone(proc),
}
};
let started_at = std::time::Instant::now();
// Relational WAL records are applied using wal-redo-postgres
let result = proc_once_cell_guard_ref
let result = proc
.apply_wal_records(rel, blknum, &base_img, records, wal_redo_timeout)
.context("apply_wal_records");
@@ -304,30 +274,33 @@ impl PostgresRedoManager {
);
// Avoid concurrent callers hitting the same issue.
// We can't prevent it from happening because we want to enable parallelism.
let proc_clone = Arc::clone(&proc_once_cell_guard_ref);
drop(proc_once_cell_guard_ref); // otherwise, the .get_mut() in the next line would deadlock with us holding the guard
match self.redo_process.get_mut().await {
Some(mut guard) => {
if Arc::ptr_eq(&*guard, &proc_clone) {
// We're the first to observe an error from `proc`, it's our job to take it out of rotation.
drop(guard.take_and_deinit());
{
let mut guard = self.redo_process.write().unwrap();
match &*guard {
Some(current_field_value) => {
if Arc::ptr_eq(current_field_value, &proc) {
// We're the first to observe an error from `proc`, it's our job to take it out of rotation.
*guard = None;
}
}
None => {
// Another thread was faster to observe the error, and already took the process out of rotation.
}
}
None => {
// Another thread was faster to observe the error, and already took the process out of rotation.
}
}
// NB: there may still be other concurrent threads using `proc`.
// The last one will send SIGKILL when the underlying Arc reaches refcount 0.
// NB: it's important to drop(proc) after drop(guard). Otherwise we'd keep
// holding the lock while waiting for the process to exit.
// NB: the drop impl blocks the current threads with a wait() system call for
// the child process. We usually avoid stalling the executor thread that way,
// but, here it's actually somewhat good. If we instead deferred the waiting
// into the background / to tokio, it could happen that if walredo always fails
// immediately, we spawn processes faster han we can SIGKILL & `wait` for them to exit.
// By doing it the way we do here, e limit this risk of run-away to at most
// $num_runtimes * $num_executor_threads.
// the child process. We dropped the `guard` above so that other threads aren't
// affected. But, it's good that the current thread _does_ block to wait.
// If we instead deferred the waiting into the background / to tokio, it could
// happen that if walredo always fails immediately, we spawn processes faster
// than we can SIGKILL & `wait` for them to exit. By doing it the way we do here,
// we limit this risk of run-away to at most $num_runtimes * $num_executor_threads.
// This probably needs revisiting at some later point.
drop(proc_clone);
drop(proc);
} else if n_attempts != 0 {
info!(n_attempts, "retried walredo succeeded");
}
@@ -407,7 +380,7 @@ mod tests {
async fn short_v14_redo() {
let expected = std::fs::read("test_data/short_v14_redo.page").unwrap();
let h = RedoHarness::new().await.unwrap();
let h = RedoHarness::new().unwrap();
let page = h
.manager
@@ -434,7 +407,7 @@ mod tests {
#[tokio::test]
async fn short_v14_fails_for_wrong_key_but_returns_zero_page() {
let h = RedoHarness::new().await.unwrap();
let h = RedoHarness::new().unwrap();
let page = h
.manager
@@ -464,7 +437,7 @@ mod tests {
#[tokio::test]
async fn test_stderr() {
let h = RedoHarness::new().await.unwrap();
let h = RedoHarness::new().unwrap();
h
.manager
.request_redo(
@@ -507,7 +480,7 @@ mod tests {
}
impl RedoHarness {
async fn new() -> anyhow::Result<Self> {
fn new() -> anyhow::Result<Self> {
crate::tenant::harness::setup_logging();
let repo_dir = camino_tempfile::tempdir()?;
@@ -515,9 +488,7 @@ mod tests {
let conf = Box::leak(Box::new(conf));
let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
let pool = crate::walredo::process_pool::Pool::launch(conf).await;
let manager = PostgresRedoManager::new(conf, tenant_shard_id, pool);
let manager = PostgresRedoManager::new(conf, tenant_shard_id);
Ok(RedoHarness {
_repo_dir: repo_dir,
@@ -529,11 +500,4 @@ mod tests {
tracing::info_span!("RedoHarness", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug())
}
}
impl Drop for RedoHarness {
fn drop(&mut self) {
self.span()
.in_scope(|| tracing::info!("RedoHarness dropping"));
}
}
}

View File

@@ -7,7 +7,7 @@ use crate::{
use anyhow::Context;
use bytes::Bytes;
use nix::poll::{PollFd, PollFlags};
use pageserver_api::reltag::RelTag;
use pageserver_api::{reltag::RelTag, shard::TenantShardId};
use postgres_ffi::BLCKSZ;
use std::os::fd::AsRawFd;
#[cfg(feature = "testing")]
@@ -29,6 +29,7 @@ mod protocol;
pub struct WalRedoProcess {
#[allow(dead_code)]
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
// Some() on construction, only becomes None on Drop.
child: Option<NoLeakChild>,
stdout: Mutex<ProcessOutput>,
@@ -54,8 +55,13 @@ impl WalRedoProcess {
// Start postgres binary in special WAL redo mode.
//
#[instrument(skip_all,fields(pg_version=pg_version))]
pub(crate) fn launch(conf: &'static PageServerConf, pg_version: u32) -> anyhow::Result<Self> {
pub(crate) fn launch(
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
pg_version: u32,
) -> anyhow::Result<Self> {
crate::span::debug_assert_current_span_has_tenant_id();
let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible.
let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?;
@@ -64,6 +70,9 @@ impl WalRedoProcess {
let child = Command::new(pg_bin_dir_path.join("postgres"))
// the first arg must be --wal-redo so the child process enters into walredo mode
.arg("--wal-redo")
// the child doesn't process this arg, but, having it in the argv helps indentify the
// walredo process for a particular tenant when debugging a pagserver
.args(["--tenant-shard-id", &format!("{tenant_shard_id}")])
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.stdout(Stdio::piped())
@@ -78,7 +87,7 @@ impl WalRedoProcess {
// the files it opens, and
// 2. to use seccomp to sandbox itself before processing the first
// walredo request.
.spawn_no_leak_child()
.spawn_no_leak_child(tenant_shard_id)
.context("spawn process")?;
WAL_REDO_PROCESS_COUNTERS.started.inc();
let mut child = scopeguard::guard(child, |child| {
@@ -139,11 +148,12 @@ impl WalRedoProcess {
error!(error=?e, "failed to read from walredo stderr");
}
}
}.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), %pg_version))
}.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %pg_version))
);
Ok(Self {
conf,
tenant_shard_id,
child: Some(child),
stdin: Mutex::new(ProcessInput {
stdin,
@@ -169,7 +179,7 @@ impl WalRedoProcess {
// Apply given WAL records ('records') over an old page image. Returns
// new page image.
//
#[instrument(skip_all, fields(pid=%self.id()))]
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))]
pub(crate) fn apply_wal_records(
&self,
rel: RelTag,
@@ -366,11 +376,13 @@ impl WalRedoProcess {
// these files will be collected to an allure report
let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len());
let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename);
let res = std::fs::OpenOptions::new()
.write(true)
.create_new(true)
.read(true)
.open(&filename)
.open(path)
.and_then(|mut f| f.write_all(writebuf));
// trip up allowed_errors

View File

@@ -1,5 +1,4 @@
use tracing;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::instrument;
@@ -16,10 +15,12 @@ use std::ops::Deref;
use std::process::Child;
use pageserver_api::shard::TenantShardId;
/// Wrapper type around `std::process::Child` which guarantees that the child
/// will be killed and waited-for by this process before being dropped.
pub(crate) struct NoLeakChild {
pub(crate) tenant_id: TenantShardId,
pub(crate) child: Option<Child>,
}
@@ -38,9 +39,12 @@ impl DerefMut for NoLeakChild {
}
impl NoLeakChild {
pub(crate) fn spawn(command: &mut Command) -> io::Result<Self> {
pub(crate) fn spawn(tenant_id: TenantShardId, command: &mut Command) -> io::Result<Self> {
let child = command.spawn()?;
Ok(NoLeakChild { child: Some(child) })
Ok(NoLeakChild {
tenant_id,
child: Some(child),
})
}
pub(crate) fn kill_and_wait(mut self, cause: WalRedoKillCause) {
@@ -72,7 +76,7 @@ impl NoLeakChild {
// with the wait().
error!(error = %e, "failed to SIGKILL; subsequent wait() might fail or wait for wrong process");
}
debug!("sent SIGKILL, waiting for child to exit");
match child.wait() {
Ok(exit_status) => {
info!(exit_status = %exit_status, "wait successful");
@@ -90,6 +94,7 @@ impl Drop for NoLeakChild {
Some(child) => child,
None => return,
};
let tenant_shard_id = self.tenant_id;
// Offload the kill+wait of the child process into the background.
// If someone stops the runtime, we'll leak the child process.
// We can ignore that case because we only stop the runtime on pageserver exit.
@@ -97,7 +102,11 @@ impl Drop for NoLeakChild {
tokio::task::spawn_blocking(move || {
// Intentionally don't inherit the tracing context from whoever is dropping us.
// This thread here is going to outlive of our dropper.
let span = tracing::info_span!("walredo");
let span = tracing::info_span!(
"walredo",
tenant_id = %tenant_shard_id.tenant_id,
shard_id = %tenant_shard_id.shard_slug()
);
let _entered = span.enter();
Self::kill_and_wait_impl(child, WalRedoKillCause::NoLeakChildDrop);
})
@@ -107,11 +116,11 @@ impl Drop for NoLeakChild {
}
pub(crate) trait NoLeakChildCommandExt {
fn spawn_no_leak_child(&mut self) -> io::Result<NoLeakChild>;
fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result<NoLeakChild>;
}
impl NoLeakChildCommandExt for Command {
fn spawn_no_leak_child(&mut self) -> io::Result<NoLeakChild> {
NoLeakChild::spawn(self)
fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result<NoLeakChild> {
NoLeakChild::spawn(tenant_id, self)
}
}

View File

@@ -1,57 +0,0 @@
use anyhow::Context;
use utils::pre_spawned_pool;
use crate::config::PageServerConf;
use super::process::WalRedoProcess;
pub struct Pool {
v14: pre_spawned_pool::Client<Box<WalRedoProcess>>,
v15: pre_spawned_pool::Client<Box<WalRedoProcess>>,
v16: pre_spawned_pool::Client<Box<WalRedoProcess>>,
}
struct Launcher {
pg_version: u32,
conf: &'static PageServerConf,
}
impl utils::pre_spawned_pool::Launcher<Box<WalRedoProcess>> for Launcher {
fn create(&self) -> anyhow::Result<Box<WalRedoProcess>> {
Ok(Box::new(WalRedoProcess::launch(
self.conf,
self.pg_version,
)?))
}
}
impl Pool {
pub async fn launch(conf: &'static PageServerConf) -> Self {
Self {
v14: pre_spawned_pool::Pool::launch(Launcher {
pg_version: 14,
conf,
})
.await,
v15: pre_spawned_pool::Pool::launch(Launcher {
pg_version: 15,
conf,
})
.await,
v16: pre_spawned_pool::Pool::launch(Launcher {
pg_version: 16,
conf,
})
.await,
}
}
pub async fn get(&self, pg_version: u32) -> anyhow::Result<Box<WalRedoProcess>> {
let pool = match pg_version {
14 => &self.v14,
15 => &self.v15,
16 => &self.v16,
x => anyhow::bail!("unknown pg version: {x}"),
};
pool.get().await.context("get pre-spawned walredo process")
}
}