heavier_once_cell: switch to tokio::sync::RwLock

Using the RwLock reduces contention on the hot path.
This commit is contained in:
Christian Schwarz
2024-02-02 17:00:49 +00:00
parent c29532cded
commit 0a09cff816
7 changed files with 151 additions and 65 deletions

View File

@@ -1,6 +1,6 @@
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex, MutexGuard,
Arc,
};
use tokio::sync::Semaphore;
@@ -12,7 +12,7 @@ use tokio::sync::Semaphore;
///
/// [`OwnedSemaphorePermit`]: tokio::sync::OwnedSemaphorePermit
pub struct OnceCell<T> {
inner: Mutex<Inner<T>>,
inner: tokio::sync::RwLock<Inner<T>>,
initializers: AtomicUsize,
}
@@ -50,7 +50,7 @@ impl<T> OnceCell<T> {
let sem = Semaphore::new(1);
sem.close();
Self {
inner: Mutex::new(Inner {
inner: tokio::sync::RwLock::new(Inner {
init_semaphore: Arc::new(sem),
value: Some(value),
}),
@@ -61,18 +61,18 @@ impl<T> OnceCell<T> {
/// Returns a guard to an existing initialized value, or uniquely initializes the value before
/// returning the guard.
///
/// Initializing might wait on any existing [`Guard::take_and_deinit`] deinitialization.
/// Initializing might wait on any existing [`GuardMut::take_and_deinit`] deinitialization.
///
/// Initialization is panic-safe and cancellation-safe.
pub async fn get_or_init<F, Fut, E>(&self, factory: F) -> Result<Guard<'_, T>, E>
pub async fn get_mut_or_init<F, Fut, E>(&self, factory: F) -> Result<GuardMut<'_, T>, E>
where
F: FnOnce(InitPermit) -> Fut,
Fut: std::future::Future<Output = Result<(T, InitPermit), E>>,
{
let sem = {
let guard = self.inner.lock().unwrap();
let guard = self.inner.write().await;
if guard.value.is_some() {
return Ok(Guard(guard));
return Ok(GuardMut(guard));
}
guard.init_semaphore.clone()
};
@@ -88,29 +88,72 @@ impl<T> OnceCell<T> {
let permit = InitPermit(permit);
let (value, _permit) = factory(permit).await?;
let guard = self.inner.lock().unwrap();
let guard = self.inner.write().await;
Ok(Self::set0(value, guard))
}
Err(_closed) => {
let guard = self.inner.lock().unwrap();
let guard = self.inner.write().await;
assert!(
guard.value.is_some(),
"semaphore got closed, must be initialized"
);
return Ok(Guard(guard));
return Ok(GuardMut(guard));
}
}
}
/// Assuming a permit is held after previous call to [`Guard::take_and_deinit`], it can be used
/// Returns a guard to an existing initialized value, or uniquely initializes the value before
/// returning the guard.
///
/// Initialization is panic-safe and cancellation-safe.
pub async fn get_or_init<F, Fut, E>(&self, factory: F) -> Result<GuardRef<'_, T>, E>
where
F: FnOnce(InitPermit) -> Fut,
Fut: std::future::Future<Output = Result<(T, InitPermit), E>>,
{
let sem = {
let guard = self.inner.read().await;
if guard.value.is_some() {
return Ok(GuardRef(guard));
}
guard.init_semaphore.clone()
};
let permit = {
// increment the count for the duration of queued
let _guard = CountWaitingInitializers::start(self);
sem.acquire_owned().await
};
match permit {
Ok(permit) => {
let permit = InitPermit(permit);
let (value, _permit) = factory(permit).await?;
let guard = self.inner.write().await;
Ok(Self::set0(value, guard).downgrade())
}
Err(_closed) => {
let guard = self.inner.read().await;
assert!(
guard.value.is_some(),
"semaphore got closed, must be initialized"
);
return Ok(GuardRef(guard));
}
}
}
/// Assuming a permit is held after previous call to [`GuardMut::take_and_deinit`], it can be used
/// to complete initializing the inner value.
///
/// # Panics
///
/// If the inner has already been initialized.
pub fn set(&self, value: T, _permit: InitPermit) -> Guard<'_, T> {
let guard = self.inner.lock().unwrap();
pub async fn set(&self, value: T, _permit: InitPermit) -> GuardMut<'_, T> {
let guard = self.inner.write().await;
// cannot assert that this permit is for self.inner.semaphore, but we can assert it cannot
// give more permits right now.
@@ -122,21 +165,31 @@ impl<T> OnceCell<T> {
Self::set0(value, guard)
}
fn set0(value: T, mut guard: std::sync::MutexGuard<'_, Inner<T>>) -> Guard<'_, T> {
fn set0(value: T, mut guard: tokio::sync::RwLockWriteGuard<'_, Inner<T>>) -> GuardMut<'_, T> {
if guard.value.is_some() {
drop(guard);
unreachable!("we won permit, must not be initialized");
}
guard.value = Some(value);
guard.init_semaphore.close();
Guard(guard)
GuardMut(guard)
}
/// Returns a guard to an existing initialized value, if any.
pub fn get(&self) -> Option<Guard<'_, T>> {
let guard = self.inner.lock().unwrap();
pub async fn get_mut(&self) -> Option<GuardMut<'_, T>> {
let guard = self.inner.write().await;
if guard.value.is_some() {
Some(Guard(guard))
Some(GuardMut(guard))
} else {
None
}
}
/// Returns a guard to an existing initialized value, if any.
pub async fn get(&self) -> Option<GuardRef<'_, T>> {
let guard = self.inner.read().await;
if guard.value.is_some() {
Some(GuardRef(guard))
} else {
None
}
@@ -168,9 +221,9 @@ impl<'a, T> Drop for CountWaitingInitializers<'a, T> {
/// Uninteresting guard object to allow short-lived access to inspect or clone the held,
/// initialized value.
#[derive(Debug)]
pub struct Guard<'a, T>(MutexGuard<'a, Inner<T>>);
pub struct GuardMut<'a, T>(tokio::sync::RwLockWriteGuard<'a, Inner<T>>);
impl<T> std::ops::Deref for Guard<'_, T> {
impl<T> std::ops::Deref for GuardMut<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
@@ -181,7 +234,7 @@ impl<T> std::ops::Deref for Guard<'_, T> {
}
}
impl<T> std::ops::DerefMut for Guard<'_, T> {
impl<T> std::ops::DerefMut for GuardMut<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0
.value
@@ -190,7 +243,7 @@ impl<T> std::ops::DerefMut for Guard<'_, T> {
}
}
impl<'a, T> Guard<'a, T> {
impl<'a, T> GuardMut<'a, T> {
/// Take the current value, and a new permit for it's deinitialization.
///
/// The permit will be on a semaphore part of the new internal value, and any following
@@ -208,6 +261,24 @@ impl<'a, T> Guard<'a, T> {
.map(|v| (v, InitPermit(permit)))
.expect("guard is not created unless value has been initialized")
}
pub fn downgrade(self) -> GuardRef<'a, T> {
GuardRef(self.0.downgrade())
}
}
#[derive(Debug)]
pub struct GuardRef<'a, T>(tokio::sync::RwLockReadGuard<'a, Inner<T>>);
impl<T> std::ops::Deref for GuardRef<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.0
.value
.as_ref()
.expect("guard is not created unless value has been initialized")
}
}
/// Type held by OnceCell (de)initializing task.
@@ -248,7 +319,7 @@ mod tests {
barrier.wait().await;
let won = {
let g = cell
.get_or_init(|permit| {
.get_mut_or_init(|permit| {
counters.factory_got_to_run.fetch_add(1, Ordering::Relaxed);
async {
counters.future_polled.fetch_add(1, Ordering::Relaxed);
@@ -295,7 +366,11 @@ mod tests {
let cell = cell.clone();
let deinitialization_started = deinitialization_started.clone();
async move {
let (answer, _permit) = cell.get().expect("initialized to value").take_and_deinit();
let (answer, _permit) = cell
.get_mut()
.await
.expect("initialized to value")
.take_and_deinit();
assert_eq!(answer, initial);
deinitialization_started.wait().await;
@@ -306,7 +381,7 @@ mod tests {
deinitialization_started.wait().await;
let started_at = tokio::time::Instant::now();
cell.get_or_init(|permit| async { Ok::<_, Infallible>((reinit, permit)) })
cell.get_mut_or_init(|permit| async { Ok::<_, Infallible>((reinit, permit)) })
.await
.unwrap();
@@ -318,21 +393,21 @@ mod tests {
jh.await.unwrap();
assert_eq!(*cell.get().unwrap(), reinit);
assert_eq!(*cell.get_mut().unwrap(), reinit);
}
#[test]
fn reinit_with_deinit_permit() {
let cell = Arc::new(OnceCell::new(42));
let (mol, permit) = cell.get().unwrap().take_and_deinit();
let (mol, permit) = cell.get_mut().await.unwrap().take_and_deinit();
cell.set(5, permit);
assert_eq!(*cell.get().unwrap(), 5);
assert_eq!(*cell.get_mut().unwrap(), 5);
let (five, permit) = cell.get().unwrap().take_and_deinit();
let (five, permit) = cell.get_mut().unwrap().take_and_deinit();
assert_eq!(5, five);
cell.set(mol, permit);
assert_eq!(*cell.get().unwrap(), 42);
assert_eq!(*cell.get_mut().unwrap(), 42);
}
#[tokio::test]
@@ -340,13 +415,13 @@ mod tests {
let cell = OnceCell::default();
for _ in 0..10 {
cell.get_or_init(|_permit| async { Err("whatever error") })
cell.get_mut_or_init(|_permit| async { Err("whatever error") })
.await
.unwrap_err();
}
let g = cell
.get_or_init(|permit| async { Ok::<_, Infallible>(("finally success", permit)) })
.get_mut_or_init(|permit| async { Ok::<_, Infallible>(("finally success", permit)) })
.await
.unwrap();
assert_eq!(*g, "finally success");
@@ -358,7 +433,7 @@ mod tests {
let barrier = tokio::sync::Barrier::new(2);
let initializer = cell.get_or_init(|permit| async {
let initializer = cell.get_mut_or_init(|permit| async {
barrier.wait().await;
futures::future::pending::<()>().await;
@@ -372,10 +447,10 @@ mod tests {
// now initializer is dropped
assert!(cell.get().is_none());
assert!(cell.get_mut().await.is_none());
let g = cell
.get_or_init(|permit| async { Ok::<_, Infallible>(("now initialized", permit)) })
.get_mut_or_init(|permit| async { Ok::<_, Infallible>(("now initialized", permit)) })
.await
.unwrap();
assert_eq!(*g, "now initialized");

View File

@@ -967,7 +967,7 @@ async fn tenant_status(
attachment_status: state.attachment_status(),
generation: tenant.generation().into(),
},
walredo: tenant.wal_redo_manager_status(),
walredo: tenant.wal_redo_manager_status().await,
timelines: tenant.list_timeline_ids(),
})
}

View File

@@ -332,9 +332,9 @@ impl From<harness::TestRedoManager> for WalRedoManager {
}
impl WalRedoManager {
pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
pub(crate) async fn maybe_quiesce(&self, idle_timeout: Duration) {
match self {
Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout),
Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout).await,
#[cfg(test)]
Self::Test(_) => {
// Not applicable to test redo manager
@@ -366,9 +366,9 @@ impl WalRedoManager {
}
}
pub(crate) fn status(&self) -> Option<WalRedoManagerStatus> {
pub(crate) async fn status(&self) -> Option<WalRedoManagerStatus> {
match self {
WalRedoManager::Prod(m) => m.status(),
WalRedoManager::Prod(m) => m.status().await,
#[cfg(test)]
WalRedoManager::Test(_) => None,
}
@@ -1962,8 +1962,11 @@ impl Tenant {
self.generation
}
pub(crate) fn wal_redo_manager_status(&self) -> Option<WalRedoManagerStatus> {
self.walredo_mgr.as_ref().and_then(|mgr| mgr.status())
pub(crate) async fn wal_redo_manager_status(&self) -> Option<WalRedoManagerStatus> {
let Some(mgr) = self.walredo_mgr.as_ref() else {
return None;
};
mgr.status().await
}
/// Changes tenant status to active, unless shutdown was already requested.

View File

@@ -299,8 +299,8 @@ impl Layer {
})
}
pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
self.0.info(reset)
pub(crate) async fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
self.0.info(reset).await
}
pub(crate) fn access_stats(&self) -> &LayerAccessStats {
@@ -611,10 +611,10 @@ impl LayerInner {
let mut rx = self.status.subscribe();
let strong = {
match self.inner.get() {
match self.inner.get_mut().await {
Some(mut either) => {
self.wanted_evicted.store(true, Ordering::Relaxed);
either.downgrade()
ResidentOrWantedEvicted::downgrade(&mut either)
}
None => return Err(EvictionError::NotFound),
}
@@ -640,7 +640,7 @@ impl LayerInner {
// use however late (compared to the initial expressing of wanted) as the
// "outcome" now
LAYER_IMPL_METRICS.inc_broadcast_lagged();
match self.inner.get() {
match self.inner.get_mut().await {
Some(_) => Err(EvictionError::Downloaded),
None => Ok(()),
}
@@ -758,7 +758,7 @@ impl LayerInner {
// use the already held initialization permit because it is impossible to hit the
// below paths anymore essentially limiting the max loop iterations to 2.
let (value, init_permit) = download(init_permit).await?;
let mut guard = self.inner.set(value, init_permit);
let mut guard = self.inner.set(value, init_permit).await;
let (strong, _upgraded) = guard
.get_and_upgrade()
.expect("init creates strong reference, we held the init permit");
@@ -766,7 +766,7 @@ impl LayerInner {
}
let (weak, permit) = {
let mut locked = self.inner.get_or_init(download).await?;
let mut locked = self.inner.get_mut_or_init(download).await?;
if let Some((strong, upgraded)) = locked.get_and_upgrade() {
if upgraded {
@@ -986,12 +986,12 @@ impl LayerInner {
}
}
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
async fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
let layer_file_name = self.desc.filename().file_name();
// this is not accurate: we could have the file locally but there was a cancellation
// and now we are not in sync, or we are currently downloading it.
let remote = self.inner.get().is_none();
let remote = self.inner.get_mut().await.is_none();
let access_stats = self.access_stats.as_api_model(reset);
@@ -1050,7 +1050,7 @@ impl LayerInner {
LAYER_IMPL_METRICS.inc_eviction_cancelled(EvictionCancelled::LayerGone);
return;
};
match this.evict_blocking(version) {
match tokio::runtime::Handle::current().block_on(this.evict_blocking(version)) {
Ok(()) => LAYER_IMPL_METRICS.inc_completed_evictions(),
Err(reason) => LAYER_IMPL_METRICS.inc_eviction_cancelled(reason),
}
@@ -1058,7 +1058,7 @@ impl LayerInner {
}
}
fn evict_blocking(&self, only_version: usize) -> Result<(), EvictionCancelled> {
async fn evict_blocking(&self, only_version: usize) -> Result<(), EvictionCancelled> {
// deleted or detached timeline, don't do anything.
let Some(timeline) = self.timeline.upgrade() else {
return Err(EvictionCancelled::TimelineGone);
@@ -1067,7 +1067,7 @@ impl LayerInner {
// to avoid starting a new download while we evict, keep holding on to the
// permit.
let _permit = {
let maybe_downloaded = self.inner.get();
let maybe_downloaded = self.inner.get_mut().await;
let (_weak, permit) = match maybe_downloaded {
Some(mut guard) => {

View File

@@ -200,7 +200,7 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
// Perhaps we did no work and the walredo process has been idle for some time:
// give it a chance to shut down to avoid leaving walredo process running indefinitely.
if let Some(walredo_mgr) = &tenant.walredo_mgr {
walredo_mgr.maybe_quiesce(period * 10);
walredo_mgr.maybe_quiesce(period * 10).await;
}
// Sleep

View File

@@ -1268,7 +1268,7 @@ impl Timeline {
let mut historic_layers = Vec::new();
for historic_layer in layer_map.iter_historic_layers() {
let historic_layer = guard.get_from_desc(&historic_layer);
historic_layers.push(historic_layer.info(reset));
historic_layers.push(historic_layer.info(reset).await);
}
LayerMapInfo {

View File

@@ -127,7 +127,7 @@ impl PostgresRedoManager {
}
}
pub(crate) fn status(&self) -> Option<WalRedoManagerStatus> {
pub(crate) async fn status(&self) -> Option<WalRedoManagerStatus> {
Some(WalRedoManagerStatus {
last_redo_at: {
let at = *self.last_redo_at.lock().unwrap();
@@ -137,7 +137,7 @@ impl PostgresRedoManager {
chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?)
})
},
pid: self.redo_process.get().map(|p| p.id()),
pid: self.redo_process.get_mut().await.map(|p| p.id()),
})
}
}
@@ -162,19 +162,27 @@ impl PostgresRedoManager {
/// This type doesn't have its own background task to check for idleness: we
/// rely on our owner calling this function periodically in its own housekeeping
/// loops.
pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
pub(crate) async fn maybe_quiesce(&self, idle_timeout: Duration) {
if let Ok(g) = self.last_redo_at.try_lock() {
if let Some(last_redo_at) = *g {
if last_redo_at.elapsed() >= idle_timeout {
drop(g);
drop(
self.redo_process
.get()
.map(|mut guard| guard.take_and_deinit()),
);
// fallthrough
} else {
return;
}
} else {
return;
}
} else {
return;
}
drop(
self.redo_process
.get_mut()
.await
.map(|mut guard| guard.take_and_deinit()),
);
}
///
@@ -268,7 +276,7 @@ impl PostgresRedoManager {
// Avoid concurrent callers hitting the same issue.
// We can't prevent it from happening because we want to enable parallelism.
{
match self.redo_process.get() {
match self.redo_process.get_mut().await {
Some(mut guard) => {
if Arc::ptr_eq(&*guard, &proc) {
// We're the first to observe an error from `proc`, it's our job to take it out of rotation.