diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs
index 773e5fc051..049a6d445d 100644
--- a/pageserver/src/walredo.rs
+++ b/pageserver/src/walredo.rs
@@ -36,11 +36,12 @@ use bytes::{Bytes, BytesMut};
use pageserver_api::key::key_to_rel_block;
use pageserver_api::models::WalRedoManagerStatus;
use pageserver_api::shard::TenantShardId;
-use std::sync::{Arc, RwLock};
+use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use tracing::*;
use utils::lsn::Lsn;
+use utils::sync::heavier_once_cell;
///
/// This is the real implementation that uses a Postgres process to
@@ -53,7 +54,7 @@ pub struct PostgresRedoManager {
tenant_shard_id: TenantShardId,
conf: &'static PageServerConf,
last_redo_at: std::sync::Mutex>,
- redo_process: RwLock >>,
+ redo_process: heavier_once_cell::OnceCell>,
}
///
@@ -101,6 +102,7 @@ impl PostgresRedoManager {
self.conf.wal_redo_timeout,
pg_version,
)
+ .await
};
img = Some(result?);
@@ -121,6 +123,7 @@ impl PostgresRedoManager {
self.conf.wal_redo_timeout,
pg_version,
)
+ .await
}
}
@@ -134,7 +137,7 @@ impl PostgresRedoManager {
chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?)
})
},
- pid: self.redo_process.read().unwrap().as_ref().map(|p| p.id()),
+ pid: self.redo_process.get().map(|p| p.id()),
})
}
}
@@ -152,7 +155,7 @@ impl PostgresRedoManager {
tenant_shard_id,
conf,
last_redo_at: std::sync::Mutex::default(),
- redo_process: RwLock::new(None),
+ redo_process: heavier_once_cell::OnceCell::default(),
}
}
@@ -164,8 +167,11 @@ impl PostgresRedoManager {
if let Some(last_redo_at) = *g {
if last_redo_at.elapsed() >= idle_timeout {
drop(g);
- let mut guard = self.redo_process.write().unwrap();
- *guard = None;
+ drop(
+ self.redo_process
+ .get()
+ .map(|mut guard| guard.take_and_deinit()),
+ );
}
}
}
@@ -175,7 +181,7 @@ impl PostgresRedoManager {
/// Process one request for WAL redo using wal-redo postgres
///
#[allow(clippy::too_many_arguments)]
- fn apply_batch_postgres(
+ async fn apply_batch_postgres(
&self,
key: Key,
lsn: Lsn,
@@ -192,41 +198,28 @@ impl PostgresRedoManager {
let mut n_attempts = 0u32;
loop {
// launch the WAL redo process on first use
- let proc: Arc = {
- let proc_guard = self.redo_process.read().unwrap();
- match &*proc_guard {
- None => {
- // "upgrade" to write lock to launch the process
- drop(proc_guard);
- let mut proc_guard = self.redo_process.write().unwrap();
- match &*proc_guard {
- None => {
- let start = Instant::now();
- let proc = Arc::new(
- process::WalRedoProcess::launch(
- self.conf,
- self.tenant_shard_id,
- pg_version,
- )
- .context("launch walredo process")?,
- );
- let duration = start.elapsed();
- WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM
- .observe(duration.as_secs_f64());
- info!(
- duration_ms = duration.as_millis(),
- pid = proc.id(),
- "launched walredo process"
- );
- *proc_guard = Some(Arc::clone(&proc));
- proc
- }
- Some(proc) => Arc::clone(proc),
- }
- }
- Some(proc) => Arc::clone(proc),
- }
- };
+ let proc = self
+ .redo_process
+ .get_or_init(|init_permit| async move {
+ let start = Instant::now();
+ let proc = Arc::new(
+ process::WalRedoProcess::launch(
+ self.conf,
+ self.tenant_shard_id,
+ pg_version,
+ )
+ .context("launch walredo process")?,
+ );
+ let duration = start.elapsed();
+ WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM.observe(duration.as_secs_f64());
+ info!(
+ duration_ms = duration.as_millis(),
+ pid = proc.id(),
+ "launched walredo process"
+ );
+ anyhow::Ok((proc, init_permit))
+ })
+ .await?;
let started_at = std::time::Instant::now();
@@ -275,12 +268,11 @@ impl PostgresRedoManager {
// Avoid concurrent callers hitting the same issue.
// We can't prevent it from happening because we want to enable parallelism.
{
- let mut guard = self.redo_process.write().unwrap();
- match &*guard {
- Some(current_field_value) => {
- if Arc::ptr_eq(current_field_value, &proc) {
+ match self.redo_process.get() {
+ Some(mut guard) => {
+ if Arc::ptr_eq(&*guard, &proc) {
// We're the first to observe an error from `proc`, it's our job to take it out of rotation.
- *guard = None;
+ drop(guard.take_and_deinit());
}
}
None => {