noisy completion

This commit is contained in:
John Spray
2023-10-19 18:42:03 +01:00
parent d8bbe302af
commit e0d3b8ebdc
2 changed files with 38 additions and 4 deletions

View File

@@ -1,12 +1,36 @@
use std::sync::Arc;
use std::sync::{atomic::AtomicI32, Arc};
use tokio::sync::{mpsc, Mutex};
/// While a reference is kept around, the associated [`Barrier::wait`] will wait.
///
/// Can be cloned, moved and kept around in futures as "guard objects".
#[derive(Clone)]
pub struct Completion(mpsc::Sender<()>);
pub struct Completion {
sender: mpsc::Sender<()>,
refcount: Arc<AtomicI32>,
}
impl Clone for Completion {
fn clone(&self) -> Self {
let i = self
.refcount
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
tracing::info!("Completion::clone[{:p}]: {i}", &(*self.refcount));
Self {
sender: self.sender.clone(),
refcount: self.refcount.clone(),
}
}
}
impl Drop for Completion {
fn drop(&mut self) {
let i = self
.refcount
.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
tracing::info!("Completion::drop[{:p}]: {i}", &(*self.refcount));
}
}
/// Barrier will wait until all clones of [`Completion`] have been dropped.
#[derive(Clone)]
@@ -45,5 +69,11 @@ pub fn channel() -> (Completion, Barrier) {
let (tx, rx) = mpsc::channel::<()>(1);
let rx = Mutex::new(rx);
let rx = Arc::new(rx);
(Completion(tx), Barrier(rx))
(
Completion {
sender: tx,
refcount: Arc::new(AtomicI32::new(1)),
},
Barrier(rx),
)
}

View File

@@ -363,6 +363,10 @@ fn start_pageserver(
let (background_jobs_can_start, background_jobs_barrier) = utils::completion::channel();
let (tenants_can_start, tenants_can_start_barrier) = utils::completion::channel();
tracing::info!("init_remote_done_tx:");
let c = init_remote_done_tx.clone();
drop(c);
let order = pageserver::InitializationOrder {
initial_tenant_load_remote: Some(init_done_tx),
initial_tenant_load: Some(init_remote_done_tx),