This commit is contained in:
Christian Schwarz
2024-02-02 14:26:26 +00:00
parent 014147a644
commit 8e8890530c
5 changed files with 51 additions and 18 deletions

16
Cargo.lock generated
View File

@@ -196,6 +196,19 @@ dependencies = [
"futures-core",
]
[[package]]
name = "async-channel"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ca33f4bc4ed1babef42cad36cc1f51fa88be00420404e5b1e80ab1b18f7678c"
dependencies = [
"concurrent-queue",
"event-listener 4.0.0",
"event-listener-strategy",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-compression"
version = "0.4.5"
@@ -2355,7 +2368,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e9b187a72d63adbfba487f48095306ac823049cb504ee195541e91c7775f5ad"
dependencies = [
"anyhow",
"async-channel",
"async-channel 1.9.0",
"base64 0.13.1",
"futures-lite",
"infer",
@@ -6241,6 +6254,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"arc-swap",
"async-channel 2.1.1",
"async-trait",
"bincode",
"byteorder",

View File

@@ -23,7 +23,7 @@ enum Command {
SetSlotCount(usize),
}
enum GetError {
pub enum GetError {
ShuttingDown,
}

View File

@@ -27,7 +27,6 @@ mod protocol;
pub struct WalRedoProcess {
#[allow(dead_code)]
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
// Some() on construction, only becomes None on Drop.
child: Option<NoLeakChild>,
stdout: Mutex<ProcessOutput>,

View File

@@ -90,7 +90,6 @@ impl Drop for NoLeakChild {
Some(child) => child,
None => return,
};
let tenant_shard_id = self.tenant_id;
// Offload the kill+wait of the child process into the background.
// If someone stops the runtime, we'll leak the child process.
// We can ignore that case because we only stop the runtime on pageserver exit.
@@ -98,11 +97,7 @@ impl Drop for NoLeakChild {
tokio::task::spawn_blocking(move || {
// Intentionally don't inherit the tracing context from whoever is dropping us.
// This thread here is going to outlive of our dropper.
let span = tracing::info_span!(
"walredo",
tenant_id = %tenant_shard_id.tenant_id,
shard_id = %tenant_shard_id.shard_slug()
);
let span = tracing::info_span!("walredo");
let _entered = span.enter();
Self::kill_and_wait_impl(child, WalRedoKillCause::NoLeakChildDrop);
})
@@ -112,11 +107,11 @@ impl Drop for NoLeakChild {
}
pub(crate) trait NoLeakChildCommandExt {
fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result<NoLeakChild>;
fn spawn_no_leak_child(&mut self) -> io::Result<NoLeakChild>;
}
impl NoLeakChildCommandExt for Command {
fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result<NoLeakChild> {
NoLeakChild::spawn(tenant_id, self)
fn spawn_no_leak_child(&mut self) -> io::Result<NoLeakChild> {
NoLeakChild::spawn(self)
}
}

View File

@@ -7,9 +7,9 @@ use crate::config::PageServerConf;
use super::process::WalRedoProcess;
pub struct Pool {
v14: pre_spawned_pool::Pool<Arc<WalRedoProcess>, Launcher>,
v15: pre_spawned_pool::Pool<Arc<WalRedoProcess>, Launcher>,
v16: pre_spawned_pool::Pool<Arc<WalRedoProcess>, Launcher>,
v14: pre_spawned_pool::Client<Arc<WalRedoProcess>>,
v15: pre_spawned_pool::Client<Arc<WalRedoProcess>>,
v16: pre_spawned_pool::Client<Arc<WalRedoProcess>>,
}
struct Launcher {
@@ -17,14 +17,39 @@ struct Launcher {
conf: &'static PageServerConf,
}
impl utils::pre_spawned_pool::Launcher<Arc<WalRedoProcess>> for Launcher{
impl utils::pre_spawned_pool::Launcher<Arc<WalRedoProcess>> for Launcher {
fn create(&self) -> anyhow::Result<Arc<WalRedoProcess>> {
WalRedoProcess::launch(self.conf, self.pg_version)
Ok(Arc::new(WalRedoProcess::launch(
self.conf,
self.pg_version,
)?))
}
}
impl Pool {
pub fn get(&self, pg_version: usize) -> anyhow::Result<Arc<WalRedoProcess>> {
pub async fn launch(conf: &'static PageServerConf) -> Self {
Self {
v14: pre_spawned_pool::Pool::launch(Launcher {
pg_version: 14,
conf,
})
.await,
v15: pre_spawned_pool::Pool::launch(Launcher {
pg_version: 15,
conf,
})
.await,
v16: pre_spawned_pool::Pool::launch(Launcher {
pg_version: 16,
conf,
})
.await,
}
}
pub fn get(
&self,
pg_version: usize,
) -> Result<Arc<WalRedoProcess>, pre_spawned_pool::GetError> {
let pool = match pg_version {
14 => &self.v14,
15 => &self.v15,