This commit is contained in:
Christian Schwarz
2024-02-02 14:42:00 +00:00
parent 8e8890530c
commit 8fe3c9ff55
3 changed files with 38 additions and 17 deletions

View File

@@ -187,6 +187,7 @@ pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted";
/// as the shared remote storage client and process initialization state.
#[derive(Clone)]
pub struct TenantSharedResources {
pub walredo_process_pool: Arc<walredo::ProcessPool>,
pub broker_client: storage_broker::BrokerClientChannel,
pub remote_storage: Option<GenericRemoteStorage>,
pub deletion_queue_client: DeletionQueueClient,
@@ -614,14 +615,19 @@ impl Tenant {
mode: SpawnMode,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(conf)));
let TenantSharedResources {
walredo_process_pool,
broker_client,
remote_storage,
deletion_queue_client,
} = resources;
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
conf,
tenant_shard_id,
resources.walredo_process_pool,
)));
let tenant = Arc::new(Tenant::new(
TenantState::Attaching,
conf,

View File

@@ -22,6 +22,7 @@
mod process;
mod process_pool;
pub(crate) use process_pool::Pool as ProcessPool;
/// Code to apply [`NeonWalRecord`]s.
mod apply_neon;
@@ -44,6 +45,8 @@ use std::time::Instant;
use tracing::*;
use utils::lsn::Lsn;
use self::process::WalRedoProcess;
///
/// This is the real implementation that uses a Postgres process to
/// perform WAL replay. Only one thread can use the process at a time,
@@ -55,7 +58,7 @@ pub struct PostgresRedoManager {
tenant_shard_id: TenantShardId,
conf: &'static PageServerConf,
last_redo_at: std::sync::Mutex<Option<Instant>>,
redo_process: RwLock<Option<Arc<process::WalRedoProcess>>>,
redo_process: RwLock<Option<Arc<TaintedProcess>>>,
}
///
@@ -141,6 +144,21 @@ impl PostgresRedoManager {
}
}
struct TaintedProcess {
tenant_shard_id: TenantShardId,
process: Option<Box<WalRedoProcess>>,
}
impl Drop for TaintedProcess {
fn drop(&mut self) {
// ensure tenant_id and span_id are in span
let span = info_span!("walredo", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug());
let _entered = span.enter();
let process = self.process.take().expect("we are the only takers");
drop(process);
}
}
impl PostgresRedoManager {
///
/// Create a new PostgresRedoManager.
@@ -205,14 +223,11 @@ impl PostgresRedoManager {
match &*proc_guard {
None => {
let start = Instant::now();
let proc = Arc::new(
process::WalRedoProcess::launch(
self.conf,
self.tenant_shard_id,
pg_version,
)
.context("launch walredo process")?,
);
let pool: Arc<process_pool::Pool> = todo!();
let proc = Arc::new(TaintedProcess {
tenant_shard_id: self.tenant_shard_id,
process: Some(pool.get(pg_version))?,
});
let duration = start.elapsed();
WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM
.observe(duration.as_secs_f64());

View File

@@ -7,9 +7,9 @@ use crate::config::PageServerConf;
use super::process::WalRedoProcess;
pub struct Pool {
v14: pre_spawned_pool::Client<Arc<WalRedoProcess>>,
v15: pre_spawned_pool::Client<Arc<WalRedoProcess>>,
v16: pre_spawned_pool::Client<Arc<WalRedoProcess>>,
v14: pre_spawned_pool::Client<Box<WalRedoProcess>>,
v15: pre_spawned_pool::Client<Box<WalRedoProcess>>,
v16: pre_spawned_pool::Client<Box<WalRedoProcess>>,
}
struct Launcher {
@@ -17,8 +17,8 @@ struct Launcher {
conf: &'static PageServerConf,
}
impl utils::pre_spawned_pool::Launcher<Arc<WalRedoProcess>> for Launcher {
fn create(&self) -> anyhow::Result<Arc<WalRedoProcess>> {
impl utils::pre_spawned_pool::Launcher<Box<WalRedoProcess>> for Launcher {
fn create(&self) -> anyhow::Result<Box<WalRedoProcess>> {
Ok(Arc::new(WalRedoProcess::launch(
self.conf,
self.pg_version,
@@ -49,7 +49,7 @@ impl Pool {
pub fn get(
&self,
pg_version: usize,
) -> Result<Arc<WalRedoProcess>, pre_spawned_pool::GetError> {
) -> Result<Box<WalRedoProcess>, pre_spawned_pool::GetError> {
let pool = match pg_version {
14 => &self.v14,
15 => &self.v15,