mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-03 13:30:38 +00:00
feat: noisier easier to debug completion
This commit is contained in:
@@ -1,16 +1,57 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio::sync::{mpsc, Mutex};
|
||||
use std::sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc,
|
||||
};
|
||||
|
||||
/// While a reference is kept around, the associated [`Barrier::wait`] will wait.
|
||||
///
|
||||
/// Can be cloned, moved and kept around in futures as "guard objects".
|
||||
#[derive(Clone)]
|
||||
pub struct Completion(mpsc::Sender<()>);
|
||||
pub struct Completion(Arc<Shared>);
|
||||
|
||||
impl Clone for Completion {
|
||||
#[track_caller]
|
||||
fn clone(&self) -> Self {
|
||||
let in_progress = self
|
||||
.0
|
||||
.completions_in_progress
|
||||
.fetch_add(1, Ordering::Release);
|
||||
assert!(in_progress < usize::MAX - 1);
|
||||
let waiters = Arc::strong_count(&self.0)
|
||||
.checked_sub(in_progress)
|
||||
.unwrap_or(0);
|
||||
let id = self.0.id;
|
||||
let location = std::panic::Location::caller();
|
||||
tracing::info!(id, waiters, in_progress, %location, "cloning");
|
||||
Self(self.0.clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Completion {
|
||||
fn drop(&mut self) {
|
||||
let now = self
|
||||
.0
|
||||
.completions_in_progress
|
||||
.fetch_update(Ordering::Release, Ordering::Relaxed, |x| x.checked_sub(1))
|
||||
.expect("should not have underflown");
|
||||
|
||||
if now == 0 {
|
||||
let count = Arc::strong_count(&self.0);
|
||||
let id = self.0.id;
|
||||
tracing::info!(id, waiters = count - 1, "notifying waiters");
|
||||
self.0.notify.notify_waiters();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Barrier will wait until all clones of [`Completion`] have been dropped.
|
||||
#[derive(Clone)]
|
||||
pub struct Barrier(Arc<Mutex<mpsc::Receiver<()>>>);
|
||||
pub struct Barrier(Arc<Shared>);
|
||||
|
||||
struct Shared {
|
||||
id: usize,
|
||||
notify: tokio::sync::Notify,
|
||||
completions_in_progress: AtomicUsize,
|
||||
}
|
||||
|
||||
impl Default for Barrier {
|
||||
fn default() -> Self {
|
||||
@@ -21,7 +62,27 @@ impl Default for Barrier {
|
||||
|
||||
impl Barrier {
|
||||
pub async fn wait(self) {
|
||||
self.0.lock().await.recv().await;
|
||||
loop {
|
||||
let in_progress = self.0.completions_in_progress.load(Ordering::Acquire);
|
||||
|
||||
if in_progress == 0 {
|
||||
tracing::info!(id = self.0.id, "wait complete!");
|
||||
break;
|
||||
} else {
|
||||
let waiters = Arc::strong_count(&self.0)
|
||||
.checked_sub(in_progress)
|
||||
// there might be drift between the two, but we are still waiting
|
||||
.unwrap_or(1);
|
||||
tracing::info!(id = self.0.id, waiters, in_progress, "waiting");
|
||||
drop(
|
||||
tokio::time::timeout(
|
||||
std::time::Duration::from_millis(100),
|
||||
self.0.notify.notified(),
|
||||
)
|
||||
.await,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn maybe_wait(barrier: Option<Barrier>) {
|
||||
@@ -41,9 +102,19 @@ impl PartialEq for Barrier {
|
||||
impl Eq for Barrier {}
|
||||
|
||||
/// Create new Guard and Barrier pair.
|
||||
#[track_caller]
|
||||
pub fn channel() -> (Completion, Barrier) {
|
||||
let (tx, rx) = mpsc::channel::<()>(1);
|
||||
let rx = Mutex::new(rx);
|
||||
let rx = Arc::new(rx);
|
||||
(Completion(tx), Barrier(rx))
|
||||
static ID_COUNTER: AtomicUsize = AtomicUsize::new(0);
|
||||
|
||||
let shared = Arc::new(Shared {
|
||||
id: ID_COUNTER.fetch_add(1, Ordering::Relaxed),
|
||||
notify: Default::default(),
|
||||
completions_in_progress: AtomicUsize::new(1),
|
||||
});
|
||||
|
||||
let location = std::panic::Location::caller();
|
||||
|
||||
tracing::info!(id = shared.id, %location, "created");
|
||||
|
||||
(Completion(shared.clone()), Barrier(shared))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user