From bf76f43ea4c9d1cb0d5bcd47fafa12747fe98b36 Mon Sep 17 00:00:00 2001 From: Bojan Serafimov Date: Mon, 27 Jun 2022 17:54:31 -0400 Subject: [PATCH] Wait for tasks to complete --- pageserver/src/layered_repository.rs | 8 ++++++++ pageserver/src/tenant_mgr.rs | 4 ++++ pageserver/src/tenant_tasks.rs | 28 +++++++++++++++++++++++++--- 3 files changed, 37 insertions(+), 3 deletions(-) diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index fdd03ecf8b..257544a43b 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -158,6 +158,13 @@ pub struct LayeredRepository { // Global pageserver config parameters pub conf: &'static PageServerConf, + // Allows us to gracefully cancel operations that edit the directory + // that backs this layered repository. Usage: + // + // Use `let _guard = file_lock.try_read()` while writing any files. + // Use `let _guard = file_lock.write().unwrap()` to wait for all writes to finish. + pub file_lock: RwLock<()>, + // Overridden tenant-specific config parameters. // We keep TenantConfOpt sturct here to preserve the information // about parameters that are not set. @@ -685,6 +692,7 @@ impl LayeredRepository { ) -> LayeredRepository { LayeredRepository { tenant_id, + file_lock: RwLock::new(()), conf, tenant_conf: Arc::new(RwLock::new(tenant_conf)), timelines: Mutex::new(HashMap::new()), diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index 5f25c3576f..c73fed140a 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -345,6 +345,10 @@ pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow: Some(tenant_id), None, ); + + // Wait until all gc/compaction tasks finish + let repo = get_repository_for_tenant(tenant_id)?; + let _guard = repo.file_lock.write().unwrap(); } } diff --git a/pageserver/src/tenant_tasks.rs b/pageserver/src/tenant_tasks.rs index 64e67bd6d0..bc49d98a12 100644 --- a/pageserver/src/tenant_tasks.rs +++ b/pageserver/src/tenant_tasks.rs @@ -37,10 +37,20 @@ async fn compaction_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) { // Run blocking part of the task let period: Result, _> = tokio::task::spawn_blocking(move || { + // Break if tenant is not active if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) { return Ok(ControlFlow::Break(())); } + + // Break if we're not allowed to write to disk let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; + // TODO do this inside repo.compaction_iteration instead. + let _guard = match repo.file_lock.try_read() { + Ok(g) => g, + Err(_) => return Ok(ControlFlow::Break(())), + }; + + // Run compaction let compaction_period = repo.get_compaction_period(); repo.compaction_iteration()?; Ok(ControlFlow::Continue(compaction_period)) @@ -140,14 +150,17 @@ pub fn init_tenant_task_pool() -> anyhow::Result<()> { loop { tokio::select! { _ = thread_mgr::shutdown_watcher() => { - // TODO do this from thread_mgr? Extend it to work with tasks? + // Send cancellation to all tasks for (_, cancel) in gc_loops.drain() { cancel.send(()).ok(); } for (_, cancel) in compaction_loops.drain() { cancel.send(()).ok(); } - // TODO wait for tasks to die? + + // Exit after all tasks finish + // TODO log any errors + while let Some(_) = futures.next().await { } break; }, tenantid = gc_recv.recv() => { @@ -212,13 +225,22 @@ async fn gc_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) { // Run blocking part of the task let period: Result, _> = tokio::task::spawn_blocking(move || { + // Break if tenant is not active if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) { return Ok(ControlFlow::Break(())); } + + // Break if we're not allowed to write to disk let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; + // TODO do this inside repo.gc_iteration instead. + let _guard = match repo.file_lock.try_read() { + Ok(g) => g, + Err(_) => return Ok(ControlFlow::Break(())), + }; + + // Run gc let gc_period = repo.get_gc_period(); let gc_horizon = repo.get_gc_horizon(); - if gc_horizon > 0 { repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?; }