Revert "switch to tokio::RwLock"

This reverts commit e8f1af5527.
This commit is contained in:
Christian Schwarz
2024-02-02 16:43:08 +00:00
parent e8f1af5527
commit 1102d3f0bf
4 changed files with 20 additions and 32 deletions

View File

@@ -967,7 +967,7 @@ async fn tenant_status(
attachment_status: state.attachment_status(),
generation: tenant.generation().into(),
},
walredo: tenant.wal_redo_manager_status().await,
walredo: tenant.wal_redo_manager_status(),
timelines: tenant.list_timeline_ids(),
})
}

View File

@@ -332,9 +332,9 @@ impl From<harness::TestRedoManager> for WalRedoManager {
}
impl WalRedoManager {
pub(crate) async fn maybe_quiesce(&self, idle_timeout: Duration) {
pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
match self {
Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout).await,
Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout),
#[cfg(test)]
Self::Test(_) => {
// Not applicable to test redo manager
@@ -366,9 +366,9 @@ impl WalRedoManager {
}
}
pub(crate) async fn status(&self) -> Option<WalRedoManagerStatus> {
pub(crate) fn status(&self) -> Option<WalRedoManagerStatus> {
match self {
WalRedoManager::Prod(m) => m.status().await,
WalRedoManager::Prod(m) => m.status(),
#[cfg(test)]
WalRedoManager::Test(_) => None,
}
@@ -1962,11 +1962,8 @@ impl Tenant {
self.generation
}
pub(crate) async fn wal_redo_manager_status(&self) -> Option<WalRedoManagerStatus> {
let Some(mgr) = self.walredo_mgr.as_ref() else {
return None;
};
mgr.status().await
pub(crate) fn wal_redo_manager_status(&self) -> Option<WalRedoManagerStatus> {
self.walredo_mgr.as_ref().and_then(|mgr| mgr.status())
}
/// Changes tenant status to active, unless shutdown was already requested.

View File

@@ -200,7 +200,7 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
// Perhaps we did no work and the walredo process has been idle for some time:
// give it a chance to shut down to avoid leaving walredo process running indefinitely.
if let Some(walredo_mgr) = &tenant.walredo_mgr {
walredo_mgr.maybe_quiesce(period * 10).await;
walredo_mgr.maybe_quiesce(period * 10);
}
// Sleep

View File

@@ -36,7 +36,7 @@ use bytes::{Bytes, BytesMut};
use pageserver_api::key::key_to_rel_block;
use pageserver_api::models::WalRedoManagerStatus;
use pageserver_api::shard::TenantShardId;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use std::time::Instant;
use tracing::*;
@@ -53,7 +53,7 @@ pub struct PostgresRedoManager {
tenant_shard_id: TenantShardId,
conf: &'static PageServerConf,
last_redo_at: std::sync::Mutex<Option<Instant>>,
redo_process: tokio::sync::RwLock<Option<Arc<process::WalRedoProcess>>>,
redo_process: RwLock<Option<Arc<process::WalRedoProcess>>>,
}
///
@@ -101,7 +101,6 @@ impl PostgresRedoManager {
self.conf.wal_redo_timeout,
pg_version,
)
.await
};
img = Some(result?);
@@ -122,11 +121,10 @@ impl PostgresRedoManager {
self.conf.wal_redo_timeout,
pg_version,
)
.await
}
}
pub(crate) async fn status(&self) -> Option<WalRedoManagerStatus> {
pub(crate) fn status(&self) -> Option<WalRedoManagerStatus> {
Some(WalRedoManagerStatus {
last_redo_at: {
let at = *self.last_redo_at.lock().unwrap();
@@ -136,7 +134,7 @@ impl PostgresRedoManager {
chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?)
})
},
pid: self.redo_process.read().await.as_ref().map(|p| p.id()),
pid: self.redo_process.read().unwrap().as_ref().map(|p| p.id()),
})
}
}
@@ -154,37 +152,30 @@ impl PostgresRedoManager {
tenant_shard_id,
conf,
last_redo_at: std::sync::Mutex::default(),
redo_process: tokio::sync::RwLock::new(None),
redo_process: RwLock::new(None),
}
}
/// This type doesn't have its own background task to check for idleness: we
/// rely on our owner calling this function periodically in its own housekeeping
/// loops.
pub(crate) async fn maybe_quiesce(&self, idle_timeout: Duration) {
pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
if let Ok(g) = self.last_redo_at.try_lock() {
if let Some(last_redo_at) = *g {
if last_redo_at.elapsed() >= idle_timeout {
drop(g);
// fallthrough
} else {
return;
let mut guard = self.redo_process.write().unwrap();
*guard = None;
}
} else {
return;
}
} else {
return;
}
let mut guard = self.redo_process.write().await;
*guard = None;
}
///
/// Process one request for WAL redo using wal-redo postgres
///
#[allow(clippy::too_many_arguments)]
async fn apply_batch_postgres(
fn apply_batch_postgres(
&self,
key: Key,
lsn: Lsn,
@@ -202,12 +193,12 @@ impl PostgresRedoManager {
loop {
// launch the WAL redo process on first use
let proc: Arc<process::WalRedoProcess> = {
let proc_guard = self.redo_process.read().await;
let proc_guard = self.redo_process.read().unwrap();
match &*proc_guard {
None => {
// "upgrade" to write lock to launch the process
drop(proc_guard);
let mut proc_guard = self.redo_process.write().await;
let mut proc_guard = self.redo_process.write().unwrap();
match &*proc_guard {
None => {
let start = Instant::now();
@@ -284,7 +275,7 @@ impl PostgresRedoManager {
// Avoid concurrent callers hitting the same issue.
// We can't prevent it from happening because we want to enable parallelism.
{
let mut guard = self.redo_process.write().await;
let mut guard = self.redo_process.write().unwrap();
match &*guard {
Some(current_field_value) => {
if Arc::ptr_eq(current_field_value, &proc) {