Compare commits

...

38 Commits

Author SHA1 Message Date
Christian Schwarz
f56931f758 Merge branch 'problame/2024-02-walredo-work/prespawn/switch-to-heavier-once-cell-with-rwlock' into problame/2024-02-walredo-work/prespawn/impl 2024-02-06 16:54:31 +00:00
Christian Schwarz
3ec02d1447 fix deadlock 2024-02-06 16:51:18 +00:00
Christian Schwarz
c93b0a1641 Merge remote-tracking branch 'origin/main' into problame/2024-02-walredo-work/prespawn/switch-to-heavier-once-cell-with-rwlock 2024-02-06 16:27:32 +00:00
Christian Schwarz
f9b6e7bbbe Merge branch 'main' into problame/2024-02-walredo-work/prespawn/switch-to-heavier-once-cell-with-rwlock 2024-02-06 15:33:09 +00:00
Christian Schwarz
e603337f39 WIP 2024-02-05 11:19:19 +00:00
Christian Schwarz
cb4908dedb Merge branch 'problame/2024-02-walredo-work/prespawn/switch-to-heavier-once-cell-with-rwlock' into problame/2024-02-walredo-work/prespawn/impl 2024-02-02 18:36:19 +00:00
Christian Schwarz
f8652fc738 Merge branch 'problame/2024-02-walredo-work/prespawn/heaver-once-cell-for-process-launch' into problame/2024-02-walredo-work/prespawn/switch-to-heavier-once-cell-with-rwlock 2024-02-02 18:36:17 +00:00
Christian Schwarz
bbf954d411 Merge branch 'problame/2024-02-walredo-work/prespawn/broken-tenants-no-walredo' into problame/2024-02-walredo-work/prespawn/heaver-once-cell-for-process-launch 2024-02-02 18:36:16 +00:00
Christian Schwarz
efb3e7bb15 Merge branch 'problame/2024-02-walredo-work/prespawn/split-code' into problame/2024-02-walredo-work/prespawn/broken-tenants-no-walredo 2024-02-02 18:36:15 +00:00
Christian Schwarz
01688a5ce1 Merge branch 'main' into problame/2024-02-walredo-work/prespawn/split-code 2024-02-02 18:36:14 +00:00
Christian Schwarz
86bd14181e Merge branch 'problame/2024-02-walredo-work/prespawn/switch-to-heavier-once-cell-with-rwlock' into problame/2024-02-walredo-work/prespawn/impl 2024-02-02 17:34:49 +00:00
Christian Schwarz
64b4b498a4 Revert "remove the walredo usage, that'll be in the next pr"
This reverts commit 20e82629df.
2024-02-02 17:25:25 +00:00
Christian Schwarz
20e82629df remove the walredo usage, that'll be in the next pr 2024-02-02 17:21:59 +00:00
Christian Schwarz
6788bde87a Merge branch 'problame/2024-02-walredo-work/prespawn/broken-tenants-no-walredo' into problame/2024-02-walredo-work/prespawn/heaver-once-cell-for-process-launch 2024-02-02 17:16:26 +00:00
Christian Schwarz
283c8abc04 Merge branch 'problame/2024-02-walredo-work/prespawn/split-code' into problame/2024-02-walredo-work/prespawn/broken-tenants-no-walredo 2024-02-02 17:16:25 +00:00
Christian Schwarz
647d409f0f Merge branch 'main' into problame/2024-02-walredo-work/prespawn/split-code 2024-02-02 17:16:24 +00:00
Christian Schwarz
0a09cff816 heavier_once_cell: switch to tokio::sync::RwLock
Using the RwLock reduces contention on the hot path.
2024-02-02 17:09:56 +00:00
Christian Schwarz
c29532cded Revert "Revert "[DO NOT MERGE] refactor(walredo): use replace RwLock with heavier_once_cell""
This reverts commit 6d94d9fb19.
2024-02-02 16:43:14 +00:00
Christian Schwarz
1102d3f0bf Revert "switch to tokio::RwLock"
This reverts commit e8f1af5527.
2024-02-02 16:43:08 +00:00
Christian Schwarz
e8f1af5527 switch to tokio::RwLock 2024-02-02 16:42:54 +00:00
Christian Schwarz
6d94d9fb19 Revert "[DO NOT MERGE] refactor(walredo): use replace RwLock with heavier_once_cell"
This reverts commit 2ab2608d4c.
2024-02-02 16:15:37 +00:00
Christian Schwarz
84169c926a Merge branch 'problame/2024-02-walredo-work/prespawn/broken-tenants-no-walredo' into problame/2024-02-walredo-work/prespawn/heaver-once-cell-for-process-launch 2024-02-02 15:53:57 +00:00
Christian Schwarz
acdebf2cec Merge branch 'problame/2024-02-walredo-work/prespawn/split-code' into problame/2024-02-walredo-work/prespawn/broken-tenants-no-walredo 2024-02-02 15:53:56 +00:00
Christian Schwarz
44cb5e5be6 Merge branch 'main' into problame/2024-02-walredo-work/prespawn/split-code 2024-02-02 15:53:55 +00:00
Christian Schwarz
2ab2608d4c [DO NOT MERGE] refactor(walredo): use replace RwLock with heavier_once_cell
The API is nice, exactly what we want, but we would want a more
optimistic underlying sync primitive.
2024-02-02 15:36:15 +00:00
Christian Schwarz
de7d366df3 wip 2024-02-02 15:25:39 +00:00
Christian Schwarz
7c1b2dc9ef Merge branch 'problame/2024-02-walredo-work/prespawn/broken-tenants-no-walredo' into problame/2024-02-walredo-work/prespawn/impl 2024-02-02 14:56:29 +00:00
Christian Schwarz
f73aa3eb32 refactor(walredo): avoid the need for a WalRedoManager in broken tenants
When we'll later introduce a global pool of pre-spawned walredo
processes (https://github.com/neondatabase/neon/issues/6581), this
refactoring avoids plumbing through the reference to the pool to all the
places where we create a broken tenant.

Builds atop the refactoring in #6583
2024-02-02 14:52:53 +00:00
Christian Schwarz
2374e1318e Merge branch 'main' into problame/2024-02-walredo-work/prespawn/split-code 2024-02-02 14:42:30 +00:00
Christian Schwarz
8fe3c9ff55 wip 2024-02-02 14:42:00 +00:00
Christian Schwarz
8e8890530c wip 2024-02-02 14:26:26 +00:00
Christian Schwarz
014147a644 wip 2024-02-02 11:50:43 +00:00
Christian Schwarz
aa0e9fdaef Merge branch 'main' into problame/2024-02-walredo-work/prespawn/split-code 2024-02-02 11:50:15 +00:00
Christian Schwarz
9b8aa270b8 cleanups 2024-02-02 10:19:18 +00:00
Christian Schwarz
4571db1750 extract NeonWalRecord apply logic 2024-02-02 10:14:50 +00:00
Christian Schwarz
6fe534fea3 move protocol ad child module of process, where it belongs 2024-02-02 10:05:50 +00:00
Christian Schwarz
8b258e20a0 move more stuff around 2024-02-02 10:03:40 +00:00
Christian Schwarz
29eec6c563 split off walredo process & protocol from walredo.rs 2024-02-02 09:59:31 +00:00
15 changed files with 397 additions and 135 deletions

16
Cargo.lock generated
View File

@@ -196,6 +196,19 @@ dependencies = [
"futures-core",
]
[[package]]
name = "async-channel"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ca33f4bc4ed1babef42cad36cc1f51fa88be00420404e5b1e80ab1b18f7678c"
dependencies = [
"concurrent-queue",
"event-listener 4.0.0",
"event-listener-strategy",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-compression"
version = "0.4.5"
@@ -2377,7 +2390,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e9b187a72d63adbfba487f48095306ac823049cb504ee195541e91c7775f5ad"
dependencies = [
"anyhow",
"async-channel",
"async-channel 1.9.0",
"base64 0.13.1",
"futures-lite",
"infer",
@@ -6281,6 +6294,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"arc-swap",
"async-channel 2.1.1",
"async-trait",
"bincode",
"byteorder",

View File

@@ -40,6 +40,7 @@ license = "Apache-2.0"
[workspace.dependencies]
anyhow = { version = "1.0", features = ["backtrace"] }
arc-swap = "1.6"
async-channel = "2.1.1"
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
azure_core = "0.18"
azure_identity = "0.18"

View File

@@ -12,6 +12,7 @@ testing = ["fail/failpoints"]
[dependencies]
arc-swap.workspace = true
async-channel.workspace = true
sentry.workspace = true
async-trait.workspace = true
anyhow.workspace = true

View File

@@ -87,6 +87,8 @@ pub mod failpoint_support;
pub mod yielding_loop;
pub mod pre_spawned_pool;
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
///
/// we have several cases:

View File

@@ -0,0 +1,159 @@
use std::sync::Arc;
use tokio::sync::{mpsc, OwnedSemaphorePermit};
use tracing::{debug, instrument};
use crate::backoff;
pub struct Client<T> {
cmds_tx: mpsc::UnboundedSender<Command>,
items_rx: async_channel::Receiver<CreatedItem<T>>,
}
pub trait Launcher<T> {
fn what() -> &'static str;
fn create(&self) -> anyhow::Result<T>;
}
#[derive(Debug)]
enum Command {
SetSlotCount(usize),
}
#[derive(thiserror::Error, Debug)]
pub enum GetError {
#[error("shutting down")]
ShuttingDown,
}
impl<T> Client<T> {
pub async fn get(&self) -> Result<T, GetError> {
self.items_rx
.recv()
.await
.map_err(|_| GetError::ShuttingDown)
.map(|CreatedItem { permit, item }| {
drop(permit); // allow a new one to be pre-spanwed
item
})
}
pub fn set_slot_count_nowait(&self, count: usize) {
self.cmds_tx
.send(Command::SetSlotCount(count))
.expect("while cmds_tx is open, the pool task doesn't exit");
}
}
pub struct Pool<T, L>
where
T: Send + 'static,
L: Send + Launcher<T> + 'static,
{
launcher: L,
cmds_rx: mpsc::UnboundedReceiver<Command>,
items_tx: async_channel::Sender<CreatedItem<T>>,
}
struct CreatedItem<T> {
permit: OwnedSemaphorePermit,
item: T,
}
impl<T, L> Pool<T, L>
where
T: Send + 'static,
L: Send + Launcher<T> + 'static,
{
pub async fn launch(launcher: L) -> Client<T> {
let (cmds_tx, cmds_rx) = mpsc::unbounded_channel(); // callers are limited to mgmt api
let (items_tx, items_rx) = async_channel::unbounded(); // task() limits pending items itself
// task gets cancelled by dropping the last Client
tokio::spawn(
Self {
launcher,
cmds_rx,
items_tx,
}
.task(),
);
Client { cmds_tx, items_rx }
}
#[instrument(skip_all)]
async fn task(mut self) {
let initial = 0;
let mut configured = initial;
let pending_items = Arc::new(tokio::sync::Semaphore::new(initial));
let mut need_forget = 0;
let mut last_launcher_failure_at = None;
loop {
debug!(
configured,
need_forget,
available = pending_items.available_permits(),
last_launcher_failure_secs_ago =
last_launcher_failure_at.map(|at| at.elapsed().as_secs_f64()),
"iteration"
);
let try_launch_once = || async {
let permit = Arc::clone(&pending_items)
.acquire_owned()
.expect("we never close this semaphore");
if need_forget > 0 {
debug!("fogetting permit to reduce semaphore count");
need_forget -= 1;
permit.forget();
continue;
}
debug!("creating item");
let item = match self.launcher.create() {
Ok(item) => item,
Err(e) => {
error!(
"launcher failed to create item: {}",
report_compact_sources(&e)
);
}
};
};
let try_launch_retrying = backoff::retry(
try_launch_once,
|_| false,
0,
u32::MAX,
L::what(),
CancellationToken::new(),
);
let cmd = tokio::select! {
res = self.cmds_rx.recv() => {
match res {
Some(cmd) => cmd,
None => return, // dropping tx acts as cancellation
}
}
item = try_launch => {
match self.items_tx.send(CreatedItem { permit, item }).await {
Ok(()) => continue,
Err(_) => {
debug!("stopping, client has gone away");
return;
}
}
}
};
debug!(?cmd, "received command");
match cmd {
Command::SetSlotCount(desired) => {
if desired > configured {
pending_items.add_permits(desired - configured);
} else if desired < configured {
need_forget += configured - desired;
}
configured = desired;
}
}
}
}
}

View File

@@ -309,6 +309,9 @@ fn start_pageserver(
info!("Starting pageserver pg protocol handler on {pg_addr}");
let pageserver_listener = tcp_listener::bind(pg_addr)?;
let walredo_process_pool =
Arc::new(COMPUTE_REQUEST_RUNTIME.block_on(pageserver::walredo::ProcessPool::launch(conf)));
// Launch broker client
// The storage_broker::connect call needs to happen inside a tokio runtime thread.
let broker_client = WALRECEIVER_RUNTIME
@@ -414,6 +417,7 @@ fn start_pageserver(
let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
conf,
TenantSharedResources {
walredo_process_pool: walredo_process_pool.clone(),
broker_client: broker_client.clone(),
remote_storage: remote_storage.clone(),
deletion_queue_client,
@@ -545,6 +549,7 @@ fn start_pageserver(
disk_usage_eviction_state,
deletion_queue.new_client(),
secondary_controller,
walredo_process_pool,
)
.context("Failed to initialize router state")?,
);

View File

@@ -55,7 +55,9 @@ use crate::tenant::storage_layer::LayerAccessStatsReset;
use crate::tenant::timeline::CompactFlags;
use crate::tenant::timeline::Timeline;
use crate::tenant::SpawnMode;
use crate::tenant::TenantSharedResources;
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
use crate::walredo;
use crate::{config::PageServerConf, tenant::mgr};
use crate::{disk_usage_eviction_task, tenant};
use pageserver_api::models::{
@@ -97,6 +99,7 @@ pub struct State {
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
secondary_controller: SecondaryController,
walredo_process_pool: Arc<walredo::ProcessPool>,
}
impl State {
@@ -110,6 +113,7 @@ impl State {
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
secondary_controller: SecondaryController,
walredo_process_pool: Arc<walredo::ProcessPool>,
) -> anyhow::Result<Self> {
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml", "/metrics"]
.iter()
@@ -125,8 +129,18 @@ impl State {
disk_usage_eviction_state,
deletion_queue_client,
secondary_controller,
walredo_process_pool,
})
}
pub(crate) fn tenant_shared_resources(&self) -> TenantSharedResources {
TenantSharedResources {
walredo_process_pool: Arc::clone(&self.walredo_process_pool),
broker_client: self.broker_client.clone(),
remote_storage: self.remote_storage.clone(),
deletion_queue_client: self.deletion_queue_client.clone(),
}
}
}
#[inline(always)]
@@ -903,9 +917,7 @@ async fn tenant_load_handler(
state.conf,
tenant_id,
generation,
state.broker_client.clone(),
state.remote_storage.clone(),
state.deletion_queue_client.clone(),
state.tenant_shared_resources(),
&ctx,
)
.instrument(info_span!("load", %tenant_id))
@@ -980,7 +992,7 @@ async fn tenant_status(
attachment_status: state.attachment_status(),
generation: tenant.generation().into(),
},
walredo: tenant.wal_redo_manager_status(),
walredo: tenant.wal_redo_manager_status().await,
timelines: tenant.list_timeline_ids(),
})
}

View File

@@ -1652,7 +1652,7 @@ pub(crate) static WAL_REDO_RECORD_COUNTER: Lazy<IntCounter> = Lazy::new(|| {
});
#[rustfmt::skip]
pub(crate) static WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM: Lazy<Histogram> = Lazy::new(|| {
pub(crate) static WAL_REDO_PROCESS_ACQUISITION_LATENCY_HISTO: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_wal_redo_process_launch_duration",
"Histogram of the duration of successful WalRedoProcess::launch calls",
@@ -2452,7 +2452,7 @@ pub fn preinitialize_metrics() {
&WAL_REDO_TIME,
&WAL_REDO_RECORDS_HISTOGRAM,
&WAL_REDO_BYTES_HISTOGRAM,
&WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM,
&WAL_REDO_PROCESS_ACQUISITION_LATENCY_HISTO,
]
.into_iter()
.for_each(|h| {

View File

@@ -189,6 +189,7 @@ pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted";
/// as the shared remote storage client and process initialization state.
#[derive(Clone)]
pub struct TenantSharedResources {
pub walredo_process_pool: Arc<crate::walredo::ProcessPool>,
pub broker_client: storage_broker::BrokerClientChannel,
pub remote_storage: Option<GenericRemoteStorage>,
pub deletion_queue_client: DeletionQueueClient,
@@ -334,9 +335,9 @@ impl From<harness::TestRedoManager> for WalRedoManager {
}
impl WalRedoManager {
pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
pub(crate) async fn maybe_quiesce(&self, idle_timeout: Duration) {
match self {
Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout),
Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout).await,
#[cfg(test)]
Self::Test(_) => {
// Not applicable to test redo manager
@@ -368,9 +369,9 @@ impl WalRedoManager {
}
}
pub(crate) fn status(&self) -> Option<WalRedoManagerStatus> {
pub(crate) async fn status(&self) -> Option<WalRedoManagerStatus> {
match self {
WalRedoManager::Prod(m) => m.status(),
WalRedoManager::Prod(m) => m.status().await,
#[cfg(test)]
WalRedoManager::Test(_) => None,
}
@@ -616,17 +617,18 @@ impl Tenant {
mode: SpawnMode,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
conf,
tenant_shard_id,
)));
let TenantSharedResources {
walredo_process_pool,
broker_client,
remote_storage,
deletion_queue_client,
} = resources;
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
conf,
tenant_shard_id,
walredo_process_pool,
)));
let attach_mode = attached_conf.location.attach_mode;
let generation = attached_conf.location.generation;
@@ -1973,8 +1975,11 @@ impl Tenant {
self.generation
}
pub(crate) fn wal_redo_manager_status(&self) -> Option<WalRedoManagerStatus> {
self.walredo_mgr.as_ref().and_then(|mgr| mgr.status())
pub(crate) async fn wal_redo_manager_status(&self) -> Option<WalRedoManagerStatus> {
let Some(mgr) = self.walredo_mgr.as_ref() else {
return None;
};
mgr.status().await
}
/// Changes tenant status to active, unless shutdown was already requested.

View File

@@ -21,7 +21,6 @@ use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::*;
use remote_storage::GenericRemoteStorage;
use utils::crashsafe;
use crate::config::PageServerConf;
@@ -1661,9 +1660,7 @@ pub(crate) async fn load_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
generation: Generation,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue_client: DeletionQueueClient,
resources: TenantSharedResources,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
// This is a legacy API (replaced by `/location_conf`). It does not support sharding
@@ -1682,12 +1679,6 @@ pub(crate) async fn load_tenant(
})?;
}
let resources = TenantSharedResources {
broker_client,
remote_storage,
deletion_queue_client,
};
let mut location_conf =
Tenant::load_tenant_config(conf, &tenant_shard_id).map_err(TenantMapInsertError::Other)?;
location_conf.attach_in_generation(generation);

View File

@@ -200,7 +200,7 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
// Perhaps we did no work and the walredo process has been idle for some time:
// give it a chance to shut down to avoid leaving walredo process running indefinitely.
if let Some(walredo_mgr) = &tenant.walredo_mgr {
walredo_mgr.maybe_quiesce(period * 10);
walredo_mgr.maybe_quiesce(period * 10).await;
}
// Sleep

View File

@@ -21,12 +21,16 @@
/// Process lifecycle and abstracction for the IPC protocol.
mod process;
mod process_pool;
pub use process_pool::Pool as ProcessPool;
use utils::sync::heavier_once_cell;
/// Code to apply [`NeonWalRecord`]s.
mod apply_neon;
use crate::config::PageServerConf;
use crate::metrics::{
WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM,
WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_PROCESS_ACQUISITION_LATENCY_HISTO,
WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_TIME,
};
use crate::repository::Key;
@@ -36,12 +40,14 @@ use bytes::{Bytes, BytesMut};
use pageserver_api::key::key_to_rel_block;
use pageserver_api::models::WalRedoManagerStatus;
use pageserver_api::shard::TenantShardId;
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use tracing::*;
use utils::lsn::Lsn;
use self::process::WalRedoProcess;
///
/// This is the real implementation that uses a Postgres process to
/// perform WAL replay. Only one thread can use the process at a time,
@@ -53,7 +59,8 @@ pub struct PostgresRedoManager {
tenant_shard_id: TenantShardId,
conf: &'static PageServerConf,
last_redo_at: std::sync::Mutex<Option<Instant>>,
redo_process: RwLock<Option<Arc<process::WalRedoProcess>>>,
redo_process: heavier_once_cell::OnceCell<Arc<TaintedProcess>>,
pool: Arc<ProcessPool>,
}
///
@@ -101,6 +108,7 @@ impl PostgresRedoManager {
self.conf.wal_redo_timeout,
pg_version,
)
.await
};
img = Some(result?);
@@ -121,10 +129,11 @@ impl PostgresRedoManager {
self.conf.wal_redo_timeout,
pg_version,
)
.await
}
}
pub(crate) fn status(&self) -> Option<WalRedoManagerStatus> {
pub(crate) async fn status(&self) -> Option<WalRedoManagerStatus> {
Some(WalRedoManagerStatus {
last_redo_at: {
let at = *self.last_redo_at.lock().unwrap();
@@ -134,11 +143,36 @@ impl PostgresRedoManager {
chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?)
})
},
pid: self.redo_process.read().unwrap().as_ref().map(|p| p.id()),
pid: self.redo_process.get_mut().await.map(|p| p.id()),
})
}
}
struct TaintedProcess {
tenant_shard_id: TenantShardId,
process: Option<Box<WalRedoProcess>>,
}
impl std::ops::Deref for TaintedProcess {
type Target = WalRedoProcess;
fn deref(&self) -> &Self::Target {
self.process
.as_ref()
.expect("only Self::drop sets it to None")
}
}
impl Drop for TaintedProcess {
fn drop(&mut self) {
// ensure tenant_id and span_id are in span
let span = info_span!("walredo", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug());
let _entered = span.enter();
let process = self.process.take().expect("we are the only takers");
drop(process);
}
}
impl PostgresRedoManager {
///
/// Create a new PostgresRedoManager.
@@ -146,36 +180,49 @@ impl PostgresRedoManager {
pub fn new(
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
pool: Arc<process_pool::Pool>,
) -> PostgresRedoManager {
// The actual process is launched lazily, on first request.
PostgresRedoManager {
tenant_shard_id,
conf,
last_redo_at: std::sync::Mutex::default(),
redo_process: RwLock::new(None),
redo_process: heavier_once_cell::OnceCell::default(),
pool,
}
}
/// This type doesn't have its own background task to check for idleness: we
/// rely on our owner calling this function periodically in its own housekeeping
/// loops.
pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
pub(crate) async fn maybe_quiesce(&self, idle_timeout: Duration) {
if let Ok(g) = self.last_redo_at.try_lock() {
if let Some(last_redo_at) = *g {
if last_redo_at.elapsed() >= idle_timeout {
drop(g);
let mut guard = self.redo_process.write().unwrap();
*guard = None;
// fallthrough
} else {
return;
}
} else {
return;
}
} else {
return;
}
drop(
self.redo_process
.get_mut()
.await
.map(|mut guard| guard.take_and_deinit()),
);
}
///
/// Process one request for WAL redo using wal-redo postgres
///
#[allow(clippy::too_many_arguments)]
fn apply_batch_postgres(
async fn apply_batch_postgres(
&self,
key: Key,
lsn: Lsn,
@@ -192,46 +239,29 @@ impl PostgresRedoManager {
let mut n_attempts = 0u32;
loop {
// launch the WAL redo process on first use
let proc: Arc<process::WalRedoProcess> = {
let proc_guard = self.redo_process.read().unwrap();
match &*proc_guard {
None => {
// "upgrade" to write lock to launch the process
drop(proc_guard);
let mut proc_guard = self.redo_process.write().unwrap();
match &*proc_guard {
None => {
let start = Instant::now();
let proc = Arc::new(
process::WalRedoProcess::launch(
self.conf,
self.tenant_shard_id,
pg_version,
)
.context("launch walredo process")?,
);
let duration = start.elapsed();
WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM
.observe(duration.as_secs_f64());
info!(
duration_ms = duration.as_millis(),
pid = proc.id(),
"launched walredo process"
);
*proc_guard = Some(Arc::clone(&proc));
proc
}
Some(proc) => Arc::clone(proc),
}
}
Some(proc) => Arc::clone(proc),
}
};
let proc_once_cell_guard_ref = self
.redo_process
.get_or_init(|init_permit| async move {
let start = Instant::now();
let proc = Arc::new(TaintedProcess {
tenant_shard_id: self.tenant_shard_id,
process: Some(self.pool.get(pg_version).await?),
});
let duration = start.elapsed();
WAL_REDO_PROCESS_ACQUISITION_LATENCY_HISTO.observe(duration.as_secs_f64());
info!(
duration_ms = duration.as_millis(),
pid = proc.id(),
"acquired pre-spawned walredo process"
);
anyhow::Ok((proc, init_permit))
})
.await?;
let started_at = std::time::Instant::now();
// Relational WAL records are applied using wal-redo-postgres
let result = proc
let result = proc_once_cell_guard_ref
.apply_wal_records(rel, blknum, &base_img, records, wal_redo_timeout)
.context("apply_wal_records");
@@ -274,33 +304,30 @@ impl PostgresRedoManager {
);
// Avoid concurrent callers hitting the same issue.
// We can't prevent it from happening because we want to enable parallelism.
{
let mut guard = self.redo_process.write().unwrap();
match &*guard {
Some(current_field_value) => {
if Arc::ptr_eq(current_field_value, &proc) {
// We're the first to observe an error from `proc`, it's our job to take it out of rotation.
*guard = None;
}
}
None => {
// Another thread was faster to observe the error, and already took the process out of rotation.
let proc_clone = Arc::clone(&proc_once_cell_guard_ref);
drop(proc_once_cell_guard_ref); // otherwise, the .get_mut() in the next line would deadlock with us holding the guard
match self.redo_process.get_mut().await {
Some(mut guard) => {
if Arc::ptr_eq(&*guard, &proc_clone) {
// We're the first to observe an error from `proc`, it's our job to take it out of rotation.
drop(guard.take_and_deinit());
}
}
None => {
// Another thread was faster to observe the error, and already took the process out of rotation.
}
}
// NB: there may still be other concurrent threads using `proc`.
// The last one will send SIGKILL when the underlying Arc reaches refcount 0.
// NB: it's important to drop(proc) after drop(guard). Otherwise we'd keep
// holding the lock while waiting for the process to exit.
// NB: the drop impl blocks the current threads with a wait() system call for
// the child process. We dropped the `guard` above so that other threads aren't
// affected. But, it's good that the current thread _does_ block to wait.
// If we instead deferred the waiting into the background / to tokio, it could
// happen that if walredo always fails immediately, we spawn processes faster
// than we can SIGKILL & `wait` for them to exit. By doing it the way we do here,
// we limit this risk of run-away to at most $num_runtimes * $num_executor_threads.
// the child process. We usually avoid stalling the executor thread that way,
// but, here it's actually somewhat good. If we instead deferred the waiting
// into the background / to tokio, it could happen that if walredo always fails
// immediately, we spawn processes faster han we can SIGKILL & `wait` for them to exit.
// By doing it the way we do here, e limit this risk of run-away to at most
// $num_runtimes * $num_executor_threads.
// This probably needs revisiting at some later point.
drop(proc);
drop(proc_clone);
} else if n_attempts != 0 {
info!(n_attempts, "retried walredo succeeded");
}
@@ -380,7 +407,7 @@ mod tests {
async fn short_v14_redo() {
let expected = std::fs::read("test_data/short_v14_redo.page").unwrap();
let h = RedoHarness::new().unwrap();
let h = RedoHarness::new().await.unwrap();
let page = h
.manager
@@ -407,7 +434,7 @@ mod tests {
#[tokio::test]
async fn short_v14_fails_for_wrong_key_but_returns_zero_page() {
let h = RedoHarness::new().unwrap();
let h = RedoHarness::new().await.unwrap();
let page = h
.manager
@@ -437,7 +464,7 @@ mod tests {
#[tokio::test]
async fn test_stderr() {
let h = RedoHarness::new().unwrap();
let h = RedoHarness::new().await.unwrap();
h
.manager
.request_redo(
@@ -480,7 +507,7 @@ mod tests {
}
impl RedoHarness {
fn new() -> anyhow::Result<Self> {
async fn new() -> anyhow::Result<Self> {
crate::tenant::harness::setup_logging();
let repo_dir = camino_tempfile::tempdir()?;
@@ -488,7 +515,9 @@ mod tests {
let conf = Box::leak(Box::new(conf));
let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
let manager = PostgresRedoManager::new(conf, tenant_shard_id);
let pool = crate::walredo::process_pool::Pool::launch(conf).await;
let manager = PostgresRedoManager::new(conf, tenant_shard_id, pool);
Ok(RedoHarness {
_repo_dir: repo_dir,
@@ -500,4 +529,11 @@ mod tests {
tracing::info_span!("RedoHarness", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug())
}
}
impl Drop for RedoHarness {
fn drop(&mut self) {
self.span()
.in_scope(|| tracing::info!("RedoHarness dropping"));
}
}
}

View File

@@ -7,7 +7,7 @@ use crate::{
use anyhow::Context;
use bytes::Bytes;
use nix::poll::{PollFd, PollFlags};
use pageserver_api::{reltag::RelTag, shard::TenantShardId};
use pageserver_api::reltag::RelTag;
use postgres_ffi::BLCKSZ;
use std::os::fd::AsRawFd;
#[cfg(feature = "testing")]
@@ -29,7 +29,6 @@ mod protocol;
pub struct WalRedoProcess {
#[allow(dead_code)]
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
// Some() on construction, only becomes None on Drop.
child: Option<NoLeakChild>,
stdout: Mutex<ProcessOutput>,
@@ -55,13 +54,8 @@ impl WalRedoProcess {
// Start postgres binary in special WAL redo mode.
//
#[instrument(skip_all,fields(pg_version=pg_version))]
pub(crate) fn launch(
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
pg_version: u32,
) -> anyhow::Result<Self> {
pub(crate) fn launch(conf: &'static PageServerConf, pg_version: u32) -> anyhow::Result<Self> {
crate::span::debug_assert_current_span_has_tenant_id();
let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible.
let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?;
@@ -70,9 +64,6 @@ impl WalRedoProcess {
let child = Command::new(pg_bin_dir_path.join("postgres"))
// the first arg must be --wal-redo so the child process enters into walredo mode
.arg("--wal-redo")
// the child doesn't process this arg, but, having it in the argv helps indentify the
// walredo process for a particular tenant when debugging a pagserver
.args(["--tenant-shard-id", &format!("{tenant_shard_id}")])
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.stdout(Stdio::piped())
@@ -87,7 +78,7 @@ impl WalRedoProcess {
// the files it opens, and
// 2. to use seccomp to sandbox itself before processing the first
// walredo request.
.spawn_no_leak_child(tenant_shard_id)
.spawn_no_leak_child()
.context("spawn process")?;
WAL_REDO_PROCESS_COUNTERS.started.inc();
let mut child = scopeguard::guard(child, |child| {
@@ -148,12 +139,11 @@ impl WalRedoProcess {
error!(error=?e, "failed to read from walredo stderr");
}
}
}.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %pg_version))
}.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), %pg_version))
);
Ok(Self {
conf,
tenant_shard_id,
child: Some(child),
stdin: Mutex::new(ProcessInput {
stdin,
@@ -179,7 +169,7 @@ impl WalRedoProcess {
// Apply given WAL records ('records') over an old page image. Returns
// new page image.
//
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))]
#[instrument(skip_all, fields(pid=%self.id()))]
pub(crate) fn apply_wal_records(
&self,
rel: RelTag,
@@ -376,13 +366,11 @@ impl WalRedoProcess {
// these files will be collected to an allure report
let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len());
let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename);
let res = std::fs::OpenOptions::new()
.write(true)
.create_new(true)
.read(true)
.open(path)
.open(&filename)
.and_then(|mut f| f.write_all(writebuf));
// trip up allowed_errors

View File

@@ -1,4 +1,5 @@
use tracing;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::instrument;
@@ -15,12 +16,10 @@ use std::ops::Deref;
use std::process::Child;
use pageserver_api::shard::TenantShardId;
/// Wrapper type around `std::process::Child` which guarantees that the child
/// will be killed and waited-for by this process before being dropped.
pub(crate) struct NoLeakChild {
pub(crate) tenant_id: TenantShardId,
pub(crate) child: Option<Child>,
}
@@ -39,12 +38,9 @@ impl DerefMut for NoLeakChild {
}
impl NoLeakChild {
pub(crate) fn spawn(tenant_id: TenantShardId, command: &mut Command) -> io::Result<Self> {
pub(crate) fn spawn(command: &mut Command) -> io::Result<Self> {
let child = command.spawn()?;
Ok(NoLeakChild {
tenant_id,
child: Some(child),
})
Ok(NoLeakChild { child: Some(child) })
}
pub(crate) fn kill_and_wait(mut self, cause: WalRedoKillCause) {
@@ -76,7 +72,7 @@ impl NoLeakChild {
// with the wait().
error!(error = %e, "failed to SIGKILL; subsequent wait() might fail or wait for wrong process");
}
debug!("sent SIGKILL, waiting for child to exit");
match child.wait() {
Ok(exit_status) => {
info!(exit_status = %exit_status, "wait successful");
@@ -94,7 +90,6 @@ impl Drop for NoLeakChild {
Some(child) => child,
None => return,
};
let tenant_shard_id = self.tenant_id;
// Offload the kill+wait of the child process into the background.
// If someone stops the runtime, we'll leak the child process.
// We can ignore that case because we only stop the runtime on pageserver exit.
@@ -102,11 +97,7 @@ impl Drop for NoLeakChild {
tokio::task::spawn_blocking(move || {
// Intentionally don't inherit the tracing context from whoever is dropping us.
// This thread here is going to outlive of our dropper.
let span = tracing::info_span!(
"walredo",
tenant_id = %tenant_shard_id.tenant_id,
shard_id = %tenant_shard_id.shard_slug()
);
let span = tracing::info_span!("walredo");
let _entered = span.enter();
Self::kill_and_wait_impl(child, WalRedoKillCause::NoLeakChildDrop);
})
@@ -116,11 +107,11 @@ impl Drop for NoLeakChild {
}
pub(crate) trait NoLeakChildCommandExt {
fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result<NoLeakChild>;
fn spawn_no_leak_child(&mut self) -> io::Result<NoLeakChild>;
}
impl NoLeakChildCommandExt for Command {
fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result<NoLeakChild> {
NoLeakChild::spawn(tenant_id, self)
fn spawn_no_leak_child(&mut self) -> io::Result<NoLeakChild> {
NoLeakChild::spawn(self)
}
}

View File

@@ -0,0 +1,57 @@
use anyhow::Context;
use utils::pre_spawned_pool;
use crate::config::PageServerConf;
use super::process::WalRedoProcess;
pub struct Pool {
v14: pre_spawned_pool::Client<Box<WalRedoProcess>>,
v15: pre_spawned_pool::Client<Box<WalRedoProcess>>,
v16: pre_spawned_pool::Client<Box<WalRedoProcess>>,
}
struct Launcher {
pg_version: u32,
conf: &'static PageServerConf,
}
impl utils::pre_spawned_pool::Launcher<Box<WalRedoProcess>> for Launcher {
fn create(&self) -> anyhow::Result<Box<WalRedoProcess>> {
Ok(Box::new(WalRedoProcess::launch(
self.conf,
self.pg_version,
)?))
}
}
impl Pool {
pub async fn launch(conf: &'static PageServerConf) -> Self {
Self {
v14: pre_spawned_pool::Pool::launch(Launcher {
pg_version: 14,
conf,
})
.await,
v15: pre_spawned_pool::Pool::launch(Launcher {
pg_version: 15,
conf,
})
.await,
v16: pre_spawned_pool::Pool::launch(Launcher {
pg_version: 16,
conf,
})
.await,
}
}
pub async fn get(&self, pg_version: u32) -> anyhow::Result<Box<WalRedoProcess>> {
let pool = match pg_version {
14 => &self.v14,
15 => &self.v15,
16 => &self.v16,
x => anyhow::bail!("unknown pg version: {x}"),
};
pool.get().await.context("get pre-spawned walredo process")
}
}