mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 12:10:37 +00:00
Compare commits
38 Commits
max_locks
...
problame/2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f56931f758 | ||
|
|
3ec02d1447 | ||
|
|
c93b0a1641 | ||
|
|
f9b6e7bbbe | ||
|
|
e603337f39 | ||
|
|
cb4908dedb | ||
|
|
f8652fc738 | ||
|
|
bbf954d411 | ||
|
|
efb3e7bb15 | ||
|
|
01688a5ce1 | ||
|
|
86bd14181e | ||
|
|
64b4b498a4 | ||
|
|
20e82629df | ||
|
|
6788bde87a | ||
|
|
283c8abc04 | ||
|
|
647d409f0f | ||
|
|
0a09cff816 | ||
|
|
c29532cded | ||
|
|
1102d3f0bf | ||
|
|
e8f1af5527 | ||
|
|
6d94d9fb19 | ||
|
|
84169c926a | ||
|
|
acdebf2cec | ||
|
|
44cb5e5be6 | ||
|
|
2ab2608d4c | ||
|
|
de7d366df3 | ||
|
|
7c1b2dc9ef | ||
|
|
f73aa3eb32 | ||
|
|
2374e1318e | ||
|
|
8fe3c9ff55 | ||
|
|
8e8890530c | ||
|
|
014147a644 | ||
|
|
aa0e9fdaef | ||
|
|
9b8aa270b8 | ||
|
|
4571db1750 | ||
|
|
6fe534fea3 | ||
|
|
8b258e20a0 | ||
|
|
29eec6c563 |
16
Cargo.lock
generated
16
Cargo.lock
generated
@@ -196,6 +196,19 @@ dependencies = [
|
||||
"futures-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-channel"
|
||||
version = "2.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1ca33f4bc4ed1babef42cad36cc1f51fa88be00420404e5b1e80ab1b18f7678c"
|
||||
dependencies = [
|
||||
"concurrent-queue",
|
||||
"event-listener 4.0.0",
|
||||
"event-listener-strategy",
|
||||
"futures-core",
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-compression"
|
||||
version = "0.4.5"
|
||||
@@ -2377,7 +2390,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6e9b187a72d63adbfba487f48095306ac823049cb504ee195541e91c7775f5ad"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-channel",
|
||||
"async-channel 1.9.0",
|
||||
"base64 0.13.1",
|
||||
"futures-lite",
|
||||
"infer",
|
||||
@@ -6281,6 +6294,7 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arc-swap",
|
||||
"async-channel 2.1.1",
|
||||
"async-trait",
|
||||
"bincode",
|
||||
"byteorder",
|
||||
|
||||
@@ -40,6 +40,7 @@ license = "Apache-2.0"
|
||||
[workspace.dependencies]
|
||||
anyhow = { version = "1.0", features = ["backtrace"] }
|
||||
arc-swap = "1.6"
|
||||
async-channel = "2.1.1"
|
||||
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
|
||||
azure_core = "0.18"
|
||||
azure_identity = "0.18"
|
||||
|
||||
@@ -12,6 +12,7 @@ testing = ["fail/failpoints"]
|
||||
|
||||
[dependencies]
|
||||
arc-swap.workspace = true
|
||||
async-channel.workspace = true
|
||||
sentry.workspace = true
|
||||
async-trait.workspace = true
|
||||
anyhow.workspace = true
|
||||
|
||||
@@ -87,6 +87,8 @@ pub mod failpoint_support;
|
||||
|
||||
pub mod yielding_loop;
|
||||
|
||||
pub mod pre_spawned_pool;
|
||||
|
||||
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
|
||||
///
|
||||
/// we have several cases:
|
||||
|
||||
159
libs/utils/src/pre_spawned_pool.rs
Normal file
159
libs/utils/src/pre_spawned_pool.rs
Normal file
@@ -0,0 +1,159 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio::sync::{mpsc, OwnedSemaphorePermit};
|
||||
use tracing::{debug, instrument};
|
||||
|
||||
use crate::backoff;
|
||||
|
||||
pub struct Client<T> {
|
||||
cmds_tx: mpsc::UnboundedSender<Command>,
|
||||
items_rx: async_channel::Receiver<CreatedItem<T>>,
|
||||
}
|
||||
|
||||
pub trait Launcher<T> {
|
||||
fn what() -> &'static str;
|
||||
fn create(&self) -> anyhow::Result<T>;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum Command {
|
||||
SetSlotCount(usize),
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum GetError {
|
||||
#[error("shutting down")]
|
||||
ShuttingDown,
|
||||
}
|
||||
|
||||
impl<T> Client<T> {
|
||||
pub async fn get(&self) -> Result<T, GetError> {
|
||||
self.items_rx
|
||||
.recv()
|
||||
.await
|
||||
.map_err(|_| GetError::ShuttingDown)
|
||||
.map(|CreatedItem { permit, item }| {
|
||||
drop(permit); // allow a new one to be pre-spanwed
|
||||
item
|
||||
})
|
||||
}
|
||||
|
||||
pub fn set_slot_count_nowait(&self, count: usize) {
|
||||
self.cmds_tx
|
||||
.send(Command::SetSlotCount(count))
|
||||
.expect("while cmds_tx is open, the pool task doesn't exit");
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Pool<T, L>
|
||||
where
|
||||
T: Send + 'static,
|
||||
L: Send + Launcher<T> + 'static,
|
||||
{
|
||||
launcher: L,
|
||||
cmds_rx: mpsc::UnboundedReceiver<Command>,
|
||||
items_tx: async_channel::Sender<CreatedItem<T>>,
|
||||
}
|
||||
|
||||
struct CreatedItem<T> {
|
||||
permit: OwnedSemaphorePermit,
|
||||
item: T,
|
||||
}
|
||||
|
||||
impl<T, L> Pool<T, L>
|
||||
where
|
||||
T: Send + 'static,
|
||||
L: Send + Launcher<T> + 'static,
|
||||
{
|
||||
pub async fn launch(launcher: L) -> Client<T> {
|
||||
let (cmds_tx, cmds_rx) = mpsc::unbounded_channel(); // callers are limited to mgmt api
|
||||
let (items_tx, items_rx) = async_channel::unbounded(); // task() limits pending items itself
|
||||
|
||||
// task gets cancelled by dropping the last Client
|
||||
tokio::spawn(
|
||||
Self {
|
||||
launcher,
|
||||
cmds_rx,
|
||||
items_tx,
|
||||
}
|
||||
.task(),
|
||||
);
|
||||
Client { cmds_tx, items_rx }
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn task(mut self) {
|
||||
let initial = 0;
|
||||
let mut configured = initial;
|
||||
let pending_items = Arc::new(tokio::sync::Semaphore::new(initial));
|
||||
let mut need_forget = 0;
|
||||
let mut last_launcher_failure_at = None;
|
||||
loop {
|
||||
debug!(
|
||||
configured,
|
||||
need_forget,
|
||||
available = pending_items.available_permits(),
|
||||
last_launcher_failure_secs_ago =
|
||||
last_launcher_failure_at.map(|at| at.elapsed().as_secs_f64()),
|
||||
"iteration"
|
||||
);
|
||||
let try_launch_once = || async {
|
||||
let permit = Arc::clone(&pending_items)
|
||||
.acquire_owned()
|
||||
.expect("we never close this semaphore");
|
||||
if need_forget > 0 {
|
||||
debug!("fogetting permit to reduce semaphore count");
|
||||
need_forget -= 1;
|
||||
permit.forget();
|
||||
continue;
|
||||
}
|
||||
debug!("creating item");
|
||||
let item = match self.launcher.create() {
|
||||
Ok(item) => item,
|
||||
Err(e) => {
|
||||
error!(
|
||||
"launcher failed to create item: {}",
|
||||
report_compact_sources(&e)
|
||||
);
|
||||
}
|
||||
};
|
||||
};
|
||||
let try_launch_retrying = backoff::retry(
|
||||
try_launch_once,
|
||||
|_| false,
|
||||
0,
|
||||
u32::MAX,
|
||||
L::what(),
|
||||
CancellationToken::new(),
|
||||
);
|
||||
let cmd = tokio::select! {
|
||||
res = self.cmds_rx.recv() => {
|
||||
match res {
|
||||
Some(cmd) => cmd,
|
||||
None => return, // dropping tx acts as cancellation
|
||||
}
|
||||
}
|
||||
item = try_launch => {
|
||||
match self.items_tx.send(CreatedItem { permit, item }).await {
|
||||
Ok(()) => continue,
|
||||
Err(_) => {
|
||||
debug!("stopping, client has gone away");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
debug!(?cmd, "received command");
|
||||
match cmd {
|
||||
Command::SetSlotCount(desired) => {
|
||||
if desired > configured {
|
||||
pending_items.add_permits(desired - configured);
|
||||
} else if desired < configured {
|
||||
need_forget += configured - desired;
|
||||
}
|
||||
configured = desired;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -309,6 +309,9 @@ fn start_pageserver(
|
||||
info!("Starting pageserver pg protocol handler on {pg_addr}");
|
||||
let pageserver_listener = tcp_listener::bind(pg_addr)?;
|
||||
|
||||
let walredo_process_pool =
|
||||
Arc::new(COMPUTE_REQUEST_RUNTIME.block_on(pageserver::walredo::ProcessPool::launch(conf)));
|
||||
|
||||
// Launch broker client
|
||||
// The storage_broker::connect call needs to happen inside a tokio runtime thread.
|
||||
let broker_client = WALRECEIVER_RUNTIME
|
||||
@@ -414,6 +417,7 @@ fn start_pageserver(
|
||||
let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
|
||||
conf,
|
||||
TenantSharedResources {
|
||||
walredo_process_pool: walredo_process_pool.clone(),
|
||||
broker_client: broker_client.clone(),
|
||||
remote_storage: remote_storage.clone(),
|
||||
deletion_queue_client,
|
||||
@@ -545,6 +549,7 @@ fn start_pageserver(
|
||||
disk_usage_eviction_state,
|
||||
deletion_queue.new_client(),
|
||||
secondary_controller,
|
||||
walredo_process_pool,
|
||||
)
|
||||
.context("Failed to initialize router state")?,
|
||||
);
|
||||
|
||||
@@ -55,7 +55,9 @@ use crate::tenant::storage_layer::LayerAccessStatsReset;
|
||||
use crate::tenant::timeline::CompactFlags;
|
||||
use crate::tenant::timeline::Timeline;
|
||||
use crate::tenant::SpawnMode;
|
||||
use crate::tenant::TenantSharedResources;
|
||||
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
|
||||
use crate::walredo;
|
||||
use crate::{config::PageServerConf, tenant::mgr};
|
||||
use crate::{disk_usage_eviction_task, tenant};
|
||||
use pageserver_api::models::{
|
||||
@@ -97,6 +99,7 @@ pub struct State {
|
||||
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
secondary_controller: SecondaryController,
|
||||
walredo_process_pool: Arc<walredo::ProcessPool>,
|
||||
}
|
||||
|
||||
impl State {
|
||||
@@ -110,6 +113,7 @@ impl State {
|
||||
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
secondary_controller: SecondaryController,
|
||||
walredo_process_pool: Arc<walredo::ProcessPool>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml", "/metrics"]
|
||||
.iter()
|
||||
@@ -125,8 +129,18 @@ impl State {
|
||||
disk_usage_eviction_state,
|
||||
deletion_queue_client,
|
||||
secondary_controller,
|
||||
walredo_process_pool,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn tenant_shared_resources(&self) -> TenantSharedResources {
|
||||
TenantSharedResources {
|
||||
walredo_process_pool: Arc::clone(&self.walredo_process_pool),
|
||||
broker_client: self.broker_client.clone(),
|
||||
remote_storage: self.remote_storage.clone(),
|
||||
deletion_queue_client: self.deletion_queue_client.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
@@ -903,9 +917,7 @@ async fn tenant_load_handler(
|
||||
state.conf,
|
||||
tenant_id,
|
||||
generation,
|
||||
state.broker_client.clone(),
|
||||
state.remote_storage.clone(),
|
||||
state.deletion_queue_client.clone(),
|
||||
state.tenant_shared_resources(),
|
||||
&ctx,
|
||||
)
|
||||
.instrument(info_span!("load", %tenant_id))
|
||||
@@ -980,7 +992,7 @@ async fn tenant_status(
|
||||
attachment_status: state.attachment_status(),
|
||||
generation: tenant.generation().into(),
|
||||
},
|
||||
walredo: tenant.wal_redo_manager_status(),
|
||||
walredo: tenant.wal_redo_manager_status().await,
|
||||
timelines: tenant.list_timeline_ids(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1652,7 +1652,7 @@ pub(crate) static WAL_REDO_RECORD_COUNTER: Lazy<IntCounter> = Lazy::new(|| {
|
||||
});
|
||||
|
||||
#[rustfmt::skip]
|
||||
pub(crate) static WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM: Lazy<Histogram> = Lazy::new(|| {
|
||||
pub(crate) static WAL_REDO_PROCESS_ACQUISITION_LATENCY_HISTO: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"pageserver_wal_redo_process_launch_duration",
|
||||
"Histogram of the duration of successful WalRedoProcess::launch calls",
|
||||
@@ -2452,7 +2452,7 @@ pub fn preinitialize_metrics() {
|
||||
&WAL_REDO_TIME,
|
||||
&WAL_REDO_RECORDS_HISTOGRAM,
|
||||
&WAL_REDO_BYTES_HISTOGRAM,
|
||||
&WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM,
|
||||
&WAL_REDO_PROCESS_ACQUISITION_LATENCY_HISTO,
|
||||
]
|
||||
.into_iter()
|
||||
.for_each(|h| {
|
||||
|
||||
@@ -189,6 +189,7 @@ pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted";
|
||||
/// as the shared remote storage client and process initialization state.
|
||||
#[derive(Clone)]
|
||||
pub struct TenantSharedResources {
|
||||
pub walredo_process_pool: Arc<crate::walredo::ProcessPool>,
|
||||
pub broker_client: storage_broker::BrokerClientChannel,
|
||||
pub remote_storage: Option<GenericRemoteStorage>,
|
||||
pub deletion_queue_client: DeletionQueueClient,
|
||||
@@ -334,9 +335,9 @@ impl From<harness::TestRedoManager> for WalRedoManager {
|
||||
}
|
||||
|
||||
impl WalRedoManager {
|
||||
pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
|
||||
pub(crate) async fn maybe_quiesce(&self, idle_timeout: Duration) {
|
||||
match self {
|
||||
Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout),
|
||||
Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout).await,
|
||||
#[cfg(test)]
|
||||
Self::Test(_) => {
|
||||
// Not applicable to test redo manager
|
||||
@@ -368,9 +369,9 @@ impl WalRedoManager {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn status(&self) -> Option<WalRedoManagerStatus> {
|
||||
pub(crate) async fn status(&self) -> Option<WalRedoManagerStatus> {
|
||||
match self {
|
||||
WalRedoManager::Prod(m) => m.status(),
|
||||
WalRedoManager::Prod(m) => m.status().await,
|
||||
#[cfg(test)]
|
||||
WalRedoManager::Test(_) => None,
|
||||
}
|
||||
@@ -616,17 +617,18 @@ impl Tenant {
|
||||
mode: SpawnMode,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Tenant>> {
|
||||
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
|
||||
conf,
|
||||
tenant_shard_id,
|
||||
)));
|
||||
|
||||
let TenantSharedResources {
|
||||
walredo_process_pool,
|
||||
broker_client,
|
||||
remote_storage,
|
||||
deletion_queue_client,
|
||||
} = resources;
|
||||
|
||||
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
|
||||
conf,
|
||||
tenant_shard_id,
|
||||
walredo_process_pool,
|
||||
)));
|
||||
let attach_mode = attached_conf.location.attach_mode;
|
||||
let generation = attached_conf.location.generation;
|
||||
|
||||
@@ -1973,8 +1975,11 @@ impl Tenant {
|
||||
self.generation
|
||||
}
|
||||
|
||||
pub(crate) fn wal_redo_manager_status(&self) -> Option<WalRedoManagerStatus> {
|
||||
self.walredo_mgr.as_ref().and_then(|mgr| mgr.status())
|
||||
pub(crate) async fn wal_redo_manager_status(&self) -> Option<WalRedoManagerStatus> {
|
||||
let Some(mgr) = self.walredo_mgr.as_ref() else {
|
||||
return None;
|
||||
};
|
||||
mgr.status().await
|
||||
}
|
||||
|
||||
/// Changes tenant status to active, unless shutdown was already requested.
|
||||
|
||||
@@ -21,7 +21,6 @@ use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use utils::crashsafe;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
@@ -1661,9 +1660,7 @@ pub(crate) async fn load_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
generation: Generation,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
resources: TenantSharedResources,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), TenantMapInsertError> {
|
||||
// This is a legacy API (replaced by `/location_conf`). It does not support sharding
|
||||
@@ -1682,12 +1679,6 @@ pub(crate) async fn load_tenant(
|
||||
})?;
|
||||
}
|
||||
|
||||
let resources = TenantSharedResources {
|
||||
broker_client,
|
||||
remote_storage,
|
||||
deletion_queue_client,
|
||||
};
|
||||
|
||||
let mut location_conf =
|
||||
Tenant::load_tenant_config(conf, &tenant_shard_id).map_err(TenantMapInsertError::Other)?;
|
||||
location_conf.attach_in_generation(generation);
|
||||
|
||||
@@ -200,7 +200,7 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
// Perhaps we did no work and the walredo process has been idle for some time:
|
||||
// give it a chance to shut down to avoid leaving walredo process running indefinitely.
|
||||
if let Some(walredo_mgr) = &tenant.walredo_mgr {
|
||||
walredo_mgr.maybe_quiesce(period * 10);
|
||||
walredo_mgr.maybe_quiesce(period * 10).await;
|
||||
}
|
||||
|
||||
// Sleep
|
||||
|
||||
@@ -21,12 +21,16 @@
|
||||
/// Process lifecycle and abstracction for the IPC protocol.
|
||||
mod process;
|
||||
|
||||
mod process_pool;
|
||||
pub use process_pool::Pool as ProcessPool;
|
||||
use utils::sync::heavier_once_cell;
|
||||
|
||||
/// Code to apply [`NeonWalRecord`]s.
|
||||
mod apply_neon;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::metrics::{
|
||||
WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM,
|
||||
WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_PROCESS_ACQUISITION_LATENCY_HISTO,
|
||||
WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_TIME,
|
||||
};
|
||||
use crate::repository::Key;
|
||||
@@ -36,12 +40,14 @@ use bytes::{Bytes, BytesMut};
|
||||
use pageserver_api::key::key_to_rel_block;
|
||||
use pageserver_api::models::WalRedoManagerStatus;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use tracing::*;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use self::process::WalRedoProcess;
|
||||
|
||||
///
|
||||
/// This is the real implementation that uses a Postgres process to
|
||||
/// perform WAL replay. Only one thread can use the process at a time,
|
||||
@@ -53,7 +59,8 @@ pub struct PostgresRedoManager {
|
||||
tenant_shard_id: TenantShardId,
|
||||
conf: &'static PageServerConf,
|
||||
last_redo_at: std::sync::Mutex<Option<Instant>>,
|
||||
redo_process: RwLock<Option<Arc<process::WalRedoProcess>>>,
|
||||
redo_process: heavier_once_cell::OnceCell<Arc<TaintedProcess>>,
|
||||
pool: Arc<ProcessPool>,
|
||||
}
|
||||
|
||||
///
|
||||
@@ -101,6 +108,7 @@ impl PostgresRedoManager {
|
||||
self.conf.wal_redo_timeout,
|
||||
pg_version,
|
||||
)
|
||||
.await
|
||||
};
|
||||
img = Some(result?);
|
||||
|
||||
@@ -121,10 +129,11 @@ impl PostgresRedoManager {
|
||||
self.conf.wal_redo_timeout,
|
||||
pg_version,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn status(&self) -> Option<WalRedoManagerStatus> {
|
||||
pub(crate) async fn status(&self) -> Option<WalRedoManagerStatus> {
|
||||
Some(WalRedoManagerStatus {
|
||||
last_redo_at: {
|
||||
let at = *self.last_redo_at.lock().unwrap();
|
||||
@@ -134,11 +143,36 @@ impl PostgresRedoManager {
|
||||
chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?)
|
||||
})
|
||||
},
|
||||
pid: self.redo_process.read().unwrap().as_ref().map(|p| p.id()),
|
||||
pid: self.redo_process.get_mut().await.map(|p| p.id()),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
struct TaintedProcess {
|
||||
tenant_shard_id: TenantShardId,
|
||||
process: Option<Box<WalRedoProcess>>,
|
||||
}
|
||||
|
||||
impl std::ops::Deref for TaintedProcess {
|
||||
type Target = WalRedoProcess;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.process
|
||||
.as_ref()
|
||||
.expect("only Self::drop sets it to None")
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TaintedProcess {
|
||||
fn drop(&mut self) {
|
||||
// ensure tenant_id and span_id are in span
|
||||
let span = info_span!("walredo", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug());
|
||||
let _entered = span.enter();
|
||||
let process = self.process.take().expect("we are the only takers");
|
||||
drop(process);
|
||||
}
|
||||
}
|
||||
|
||||
impl PostgresRedoManager {
|
||||
///
|
||||
/// Create a new PostgresRedoManager.
|
||||
@@ -146,36 +180,49 @@ impl PostgresRedoManager {
|
||||
pub fn new(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_shard_id: TenantShardId,
|
||||
pool: Arc<process_pool::Pool>,
|
||||
) -> PostgresRedoManager {
|
||||
// The actual process is launched lazily, on first request.
|
||||
PostgresRedoManager {
|
||||
tenant_shard_id,
|
||||
conf,
|
||||
last_redo_at: std::sync::Mutex::default(),
|
||||
redo_process: RwLock::new(None),
|
||||
redo_process: heavier_once_cell::OnceCell::default(),
|
||||
pool,
|
||||
}
|
||||
}
|
||||
|
||||
/// This type doesn't have its own background task to check for idleness: we
|
||||
/// rely on our owner calling this function periodically in its own housekeeping
|
||||
/// loops.
|
||||
pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
|
||||
pub(crate) async fn maybe_quiesce(&self, idle_timeout: Duration) {
|
||||
if let Ok(g) = self.last_redo_at.try_lock() {
|
||||
if let Some(last_redo_at) = *g {
|
||||
if last_redo_at.elapsed() >= idle_timeout {
|
||||
drop(g);
|
||||
let mut guard = self.redo_process.write().unwrap();
|
||||
*guard = None;
|
||||
// fallthrough
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
drop(
|
||||
self.redo_process
|
||||
.get_mut()
|
||||
.await
|
||||
.map(|mut guard| guard.take_and_deinit()),
|
||||
);
|
||||
}
|
||||
|
||||
///
|
||||
/// Process one request for WAL redo using wal-redo postgres
|
||||
///
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn apply_batch_postgres(
|
||||
async fn apply_batch_postgres(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
@@ -192,46 +239,29 @@ impl PostgresRedoManager {
|
||||
let mut n_attempts = 0u32;
|
||||
loop {
|
||||
// launch the WAL redo process on first use
|
||||
let proc: Arc<process::WalRedoProcess> = {
|
||||
let proc_guard = self.redo_process.read().unwrap();
|
||||
match &*proc_guard {
|
||||
None => {
|
||||
// "upgrade" to write lock to launch the process
|
||||
drop(proc_guard);
|
||||
let mut proc_guard = self.redo_process.write().unwrap();
|
||||
match &*proc_guard {
|
||||
None => {
|
||||
let start = Instant::now();
|
||||
let proc = Arc::new(
|
||||
process::WalRedoProcess::launch(
|
||||
self.conf,
|
||||
self.tenant_shard_id,
|
||||
pg_version,
|
||||
)
|
||||
.context("launch walredo process")?,
|
||||
);
|
||||
let duration = start.elapsed();
|
||||
WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM
|
||||
.observe(duration.as_secs_f64());
|
||||
info!(
|
||||
duration_ms = duration.as_millis(),
|
||||
pid = proc.id(),
|
||||
"launched walredo process"
|
||||
);
|
||||
*proc_guard = Some(Arc::clone(&proc));
|
||||
proc
|
||||
}
|
||||
Some(proc) => Arc::clone(proc),
|
||||
}
|
||||
}
|
||||
Some(proc) => Arc::clone(proc),
|
||||
}
|
||||
};
|
||||
let proc_once_cell_guard_ref = self
|
||||
.redo_process
|
||||
.get_or_init(|init_permit| async move {
|
||||
let start = Instant::now();
|
||||
let proc = Arc::new(TaintedProcess {
|
||||
tenant_shard_id: self.tenant_shard_id,
|
||||
process: Some(self.pool.get(pg_version).await?),
|
||||
});
|
||||
let duration = start.elapsed();
|
||||
WAL_REDO_PROCESS_ACQUISITION_LATENCY_HISTO.observe(duration.as_secs_f64());
|
||||
info!(
|
||||
duration_ms = duration.as_millis(),
|
||||
pid = proc.id(),
|
||||
"acquired pre-spawned walredo process"
|
||||
);
|
||||
anyhow::Ok((proc, init_permit))
|
||||
})
|
||||
.await?;
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
|
||||
// Relational WAL records are applied using wal-redo-postgres
|
||||
let result = proc
|
||||
let result = proc_once_cell_guard_ref
|
||||
.apply_wal_records(rel, blknum, &base_img, records, wal_redo_timeout)
|
||||
.context("apply_wal_records");
|
||||
|
||||
@@ -274,33 +304,30 @@ impl PostgresRedoManager {
|
||||
);
|
||||
// Avoid concurrent callers hitting the same issue.
|
||||
// We can't prevent it from happening because we want to enable parallelism.
|
||||
{
|
||||
let mut guard = self.redo_process.write().unwrap();
|
||||
match &*guard {
|
||||
Some(current_field_value) => {
|
||||
if Arc::ptr_eq(current_field_value, &proc) {
|
||||
// We're the first to observe an error from `proc`, it's our job to take it out of rotation.
|
||||
*guard = None;
|
||||
}
|
||||
}
|
||||
None => {
|
||||
// Another thread was faster to observe the error, and already took the process out of rotation.
|
||||
let proc_clone = Arc::clone(&proc_once_cell_guard_ref);
|
||||
drop(proc_once_cell_guard_ref); // otherwise, the .get_mut() in the next line would deadlock with us holding the guard
|
||||
match self.redo_process.get_mut().await {
|
||||
Some(mut guard) => {
|
||||
if Arc::ptr_eq(&*guard, &proc_clone) {
|
||||
// We're the first to observe an error from `proc`, it's our job to take it out of rotation.
|
||||
drop(guard.take_and_deinit());
|
||||
}
|
||||
}
|
||||
None => {
|
||||
// Another thread was faster to observe the error, and already took the process out of rotation.
|
||||
}
|
||||
}
|
||||
// NB: there may still be other concurrent threads using `proc`.
|
||||
// The last one will send SIGKILL when the underlying Arc reaches refcount 0.
|
||||
// NB: it's important to drop(proc) after drop(guard). Otherwise we'd keep
|
||||
// holding the lock while waiting for the process to exit.
|
||||
// NB: the drop impl blocks the current threads with a wait() system call for
|
||||
// the child process. We dropped the `guard` above so that other threads aren't
|
||||
// affected. But, it's good that the current thread _does_ block to wait.
|
||||
// If we instead deferred the waiting into the background / to tokio, it could
|
||||
// happen that if walredo always fails immediately, we spawn processes faster
|
||||
// than we can SIGKILL & `wait` for them to exit. By doing it the way we do here,
|
||||
// we limit this risk of run-away to at most $num_runtimes * $num_executor_threads.
|
||||
// the child process. We usually avoid stalling the executor thread that way,
|
||||
// but, here it's actually somewhat good. If we instead deferred the waiting
|
||||
// into the background / to tokio, it could happen that if walredo always fails
|
||||
// immediately, we spawn processes faster han we can SIGKILL & `wait` for them to exit.
|
||||
// By doing it the way we do here, e limit this risk of run-away to at most
|
||||
// $num_runtimes * $num_executor_threads.
|
||||
// This probably needs revisiting at some later point.
|
||||
drop(proc);
|
||||
drop(proc_clone);
|
||||
} else if n_attempts != 0 {
|
||||
info!(n_attempts, "retried walredo succeeded");
|
||||
}
|
||||
@@ -380,7 +407,7 @@ mod tests {
|
||||
async fn short_v14_redo() {
|
||||
let expected = std::fs::read("test_data/short_v14_redo.page").unwrap();
|
||||
|
||||
let h = RedoHarness::new().unwrap();
|
||||
let h = RedoHarness::new().await.unwrap();
|
||||
|
||||
let page = h
|
||||
.manager
|
||||
@@ -407,7 +434,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn short_v14_fails_for_wrong_key_but_returns_zero_page() {
|
||||
let h = RedoHarness::new().unwrap();
|
||||
let h = RedoHarness::new().await.unwrap();
|
||||
|
||||
let page = h
|
||||
.manager
|
||||
@@ -437,7 +464,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_stderr() {
|
||||
let h = RedoHarness::new().unwrap();
|
||||
let h = RedoHarness::new().await.unwrap();
|
||||
h
|
||||
.manager
|
||||
.request_redo(
|
||||
@@ -480,7 +507,7 @@ mod tests {
|
||||
}
|
||||
|
||||
impl RedoHarness {
|
||||
fn new() -> anyhow::Result<Self> {
|
||||
async fn new() -> anyhow::Result<Self> {
|
||||
crate::tenant::harness::setup_logging();
|
||||
|
||||
let repo_dir = camino_tempfile::tempdir()?;
|
||||
@@ -488,7 +515,9 @@ mod tests {
|
||||
let conf = Box::leak(Box::new(conf));
|
||||
let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
|
||||
|
||||
let manager = PostgresRedoManager::new(conf, tenant_shard_id);
|
||||
let pool = crate::walredo::process_pool::Pool::launch(conf).await;
|
||||
|
||||
let manager = PostgresRedoManager::new(conf, tenant_shard_id, pool);
|
||||
|
||||
Ok(RedoHarness {
|
||||
_repo_dir: repo_dir,
|
||||
@@ -500,4 +529,11 @@ mod tests {
|
||||
tracing::info_span!("RedoHarness", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for RedoHarness {
|
||||
fn drop(&mut self) {
|
||||
self.span()
|
||||
.in_scope(|| tracing::info!("RedoHarness dropping"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@ use crate::{
|
||||
use anyhow::Context;
|
||||
use bytes::Bytes;
|
||||
use nix::poll::{PollFd, PollFlags};
|
||||
use pageserver_api::{reltag::RelTag, shard::TenantShardId};
|
||||
use pageserver_api::reltag::RelTag;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use std::os::fd::AsRawFd;
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -29,7 +29,6 @@ mod protocol;
|
||||
pub struct WalRedoProcess {
|
||||
#[allow(dead_code)]
|
||||
conf: &'static PageServerConf,
|
||||
tenant_shard_id: TenantShardId,
|
||||
// Some() on construction, only becomes None on Drop.
|
||||
child: Option<NoLeakChild>,
|
||||
stdout: Mutex<ProcessOutput>,
|
||||
@@ -55,13 +54,8 @@ impl WalRedoProcess {
|
||||
// Start postgres binary in special WAL redo mode.
|
||||
//
|
||||
#[instrument(skip_all,fields(pg_version=pg_version))]
|
||||
pub(crate) fn launch(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_shard_id: TenantShardId,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<Self> {
|
||||
pub(crate) fn launch(conf: &'static PageServerConf, pg_version: u32) -> anyhow::Result<Self> {
|
||||
crate::span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible.
|
||||
let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?;
|
||||
|
||||
@@ -70,9 +64,6 @@ impl WalRedoProcess {
|
||||
let child = Command::new(pg_bin_dir_path.join("postgres"))
|
||||
// the first arg must be --wal-redo so the child process enters into walredo mode
|
||||
.arg("--wal-redo")
|
||||
// the child doesn't process this arg, but, having it in the argv helps indentify the
|
||||
// walredo process for a particular tenant when debugging a pagserver
|
||||
.args(["--tenant-shard-id", &format!("{tenant_shard_id}")])
|
||||
.stdin(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.stdout(Stdio::piped())
|
||||
@@ -87,7 +78,7 @@ impl WalRedoProcess {
|
||||
// the files it opens, and
|
||||
// 2. to use seccomp to sandbox itself before processing the first
|
||||
// walredo request.
|
||||
.spawn_no_leak_child(tenant_shard_id)
|
||||
.spawn_no_leak_child()
|
||||
.context("spawn process")?;
|
||||
WAL_REDO_PROCESS_COUNTERS.started.inc();
|
||||
let mut child = scopeguard::guard(child, |child| {
|
||||
@@ -148,12 +139,11 @@ impl WalRedoProcess {
|
||||
error!(error=?e, "failed to read from walredo stderr");
|
||||
}
|
||||
}
|
||||
}.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %pg_version))
|
||||
}.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), %pg_version))
|
||||
);
|
||||
|
||||
Ok(Self {
|
||||
conf,
|
||||
tenant_shard_id,
|
||||
child: Some(child),
|
||||
stdin: Mutex::new(ProcessInput {
|
||||
stdin,
|
||||
@@ -179,7 +169,7 @@ impl WalRedoProcess {
|
||||
// Apply given WAL records ('records') over an old page image. Returns
|
||||
// new page image.
|
||||
//
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))]
|
||||
#[instrument(skip_all, fields(pid=%self.id()))]
|
||||
pub(crate) fn apply_wal_records(
|
||||
&self,
|
||||
rel: RelTag,
|
||||
@@ -376,13 +366,11 @@ impl WalRedoProcess {
|
||||
// these files will be collected to an allure report
|
||||
let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len());
|
||||
|
||||
let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename);
|
||||
|
||||
let res = std::fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true)
|
||||
.read(true)
|
||||
.open(path)
|
||||
.open(&filename)
|
||||
.and_then(|mut f| f.write_all(writebuf));
|
||||
|
||||
// trip up allowed_errors
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use tracing;
|
||||
use tracing::debug;
|
||||
use tracing::error;
|
||||
use tracing::info;
|
||||
use tracing::instrument;
|
||||
@@ -15,12 +16,10 @@ use std::ops::Deref;
|
||||
|
||||
use std::process::Child;
|
||||
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
|
||||
/// Wrapper type around `std::process::Child` which guarantees that the child
|
||||
/// will be killed and waited-for by this process before being dropped.
|
||||
pub(crate) struct NoLeakChild {
|
||||
pub(crate) tenant_id: TenantShardId,
|
||||
pub(crate) child: Option<Child>,
|
||||
}
|
||||
|
||||
@@ -39,12 +38,9 @@ impl DerefMut for NoLeakChild {
|
||||
}
|
||||
|
||||
impl NoLeakChild {
|
||||
pub(crate) fn spawn(tenant_id: TenantShardId, command: &mut Command) -> io::Result<Self> {
|
||||
pub(crate) fn spawn(command: &mut Command) -> io::Result<Self> {
|
||||
let child = command.spawn()?;
|
||||
Ok(NoLeakChild {
|
||||
tenant_id,
|
||||
child: Some(child),
|
||||
})
|
||||
Ok(NoLeakChild { child: Some(child) })
|
||||
}
|
||||
|
||||
pub(crate) fn kill_and_wait(mut self, cause: WalRedoKillCause) {
|
||||
@@ -76,7 +72,7 @@ impl NoLeakChild {
|
||||
// with the wait().
|
||||
error!(error = %e, "failed to SIGKILL; subsequent wait() might fail or wait for wrong process");
|
||||
}
|
||||
|
||||
debug!("sent SIGKILL, waiting for child to exit");
|
||||
match child.wait() {
|
||||
Ok(exit_status) => {
|
||||
info!(exit_status = %exit_status, "wait successful");
|
||||
@@ -94,7 +90,6 @@ impl Drop for NoLeakChild {
|
||||
Some(child) => child,
|
||||
None => return,
|
||||
};
|
||||
let tenant_shard_id = self.tenant_id;
|
||||
// Offload the kill+wait of the child process into the background.
|
||||
// If someone stops the runtime, we'll leak the child process.
|
||||
// We can ignore that case because we only stop the runtime on pageserver exit.
|
||||
@@ -102,11 +97,7 @@ impl Drop for NoLeakChild {
|
||||
tokio::task::spawn_blocking(move || {
|
||||
// Intentionally don't inherit the tracing context from whoever is dropping us.
|
||||
// This thread here is going to outlive of our dropper.
|
||||
let span = tracing::info_span!(
|
||||
"walredo",
|
||||
tenant_id = %tenant_shard_id.tenant_id,
|
||||
shard_id = %tenant_shard_id.shard_slug()
|
||||
);
|
||||
let span = tracing::info_span!("walredo");
|
||||
let _entered = span.enter();
|
||||
Self::kill_and_wait_impl(child, WalRedoKillCause::NoLeakChildDrop);
|
||||
})
|
||||
@@ -116,11 +107,11 @@ impl Drop for NoLeakChild {
|
||||
}
|
||||
|
||||
pub(crate) trait NoLeakChildCommandExt {
|
||||
fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result<NoLeakChild>;
|
||||
fn spawn_no_leak_child(&mut self) -> io::Result<NoLeakChild>;
|
||||
}
|
||||
|
||||
impl NoLeakChildCommandExt for Command {
|
||||
fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result<NoLeakChild> {
|
||||
NoLeakChild::spawn(tenant_id, self)
|
||||
fn spawn_no_leak_child(&mut self) -> io::Result<NoLeakChild> {
|
||||
NoLeakChild::spawn(self)
|
||||
}
|
||||
}
|
||||
|
||||
57
pageserver/src/walredo/process_pool.rs
Normal file
57
pageserver/src/walredo/process_pool.rs
Normal file
@@ -0,0 +1,57 @@
|
||||
use anyhow::Context;
|
||||
use utils::pre_spawned_pool;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
|
||||
use super::process::WalRedoProcess;
|
||||
|
||||
pub struct Pool {
|
||||
v14: pre_spawned_pool::Client<Box<WalRedoProcess>>,
|
||||
v15: pre_spawned_pool::Client<Box<WalRedoProcess>>,
|
||||
v16: pre_spawned_pool::Client<Box<WalRedoProcess>>,
|
||||
}
|
||||
|
||||
struct Launcher {
|
||||
pg_version: u32,
|
||||
conf: &'static PageServerConf,
|
||||
}
|
||||
|
||||
impl utils::pre_spawned_pool::Launcher<Box<WalRedoProcess>> for Launcher {
|
||||
fn create(&self) -> anyhow::Result<Box<WalRedoProcess>> {
|
||||
Ok(Box::new(WalRedoProcess::launch(
|
||||
self.conf,
|
||||
self.pg_version,
|
||||
)?))
|
||||
}
|
||||
}
|
||||
|
||||
impl Pool {
|
||||
pub async fn launch(conf: &'static PageServerConf) -> Self {
|
||||
Self {
|
||||
v14: pre_spawned_pool::Pool::launch(Launcher {
|
||||
pg_version: 14,
|
||||
conf,
|
||||
})
|
||||
.await,
|
||||
v15: pre_spawned_pool::Pool::launch(Launcher {
|
||||
pg_version: 15,
|
||||
conf,
|
||||
})
|
||||
.await,
|
||||
v16: pre_spawned_pool::Pool::launch(Launcher {
|
||||
pg_version: 16,
|
||||
conf,
|
||||
})
|
||||
.await,
|
||||
}
|
||||
}
|
||||
pub async fn get(&self, pg_version: u32) -> anyhow::Result<Box<WalRedoProcess>> {
|
||||
let pool = match pg_version {
|
||||
14 => &self.v14,
|
||||
15 => &self.v15,
|
||||
16 => &self.v16,
|
||||
x => anyhow::bail!("unknown pg version: {x}"),
|
||||
};
|
||||
pool.get().await.context("get pre-spawned walredo process")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user