mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 17:02:56 +00:00
Wait for tasks to complete
This commit is contained in:
@@ -158,6 +158,13 @@ pub struct LayeredRepository {
|
||||
// Global pageserver config parameters
|
||||
pub conf: &'static PageServerConf,
|
||||
|
||||
// Allows us to gracefully cancel operations that edit the directory
|
||||
// that backs this layered repository. Usage:
|
||||
//
|
||||
// Use `let _guard = file_lock.try_read()` while writing any files.
|
||||
// Use `let _guard = file_lock.write().unwrap()` to wait for all writes to finish.
|
||||
pub file_lock: RwLock<()>,
|
||||
|
||||
// Overridden tenant-specific config parameters.
|
||||
// We keep TenantConfOpt sturct here to preserve the information
|
||||
// about parameters that are not set.
|
||||
@@ -685,6 +692,7 @@ impl LayeredRepository {
|
||||
) -> LayeredRepository {
|
||||
LayeredRepository {
|
||||
tenant_id,
|
||||
file_lock: RwLock::new(()),
|
||||
conf,
|
||||
tenant_conf: Arc::new(RwLock::new(tenant_conf)),
|
||||
timelines: Mutex::new(HashMap::new()),
|
||||
|
||||
@@ -345,6 +345,10 @@ pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow:
|
||||
Some(tenant_id),
|
||||
None,
|
||||
);
|
||||
|
||||
// Wait until all gc/compaction tasks finish
|
||||
let repo = get_repository_for_tenant(tenant_id)?;
|
||||
let _guard = repo.file_lock.write().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -37,10 +37,20 @@ async fn compaction_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) {
|
||||
|
||||
// Run blocking part of the task
|
||||
let period: Result<Result<_, anyhow::Error>, _> = tokio::task::spawn_blocking(move || {
|
||||
// Break if tenant is not active
|
||||
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
|
||||
return Ok(ControlFlow::Break(()));
|
||||
}
|
||||
|
||||
// Break if we're not allowed to write to disk
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
// TODO do this inside repo.compaction_iteration instead.
|
||||
let _guard = match repo.file_lock.try_read() {
|
||||
Ok(g) => g,
|
||||
Err(_) => return Ok(ControlFlow::Break(())),
|
||||
};
|
||||
|
||||
// Run compaction
|
||||
let compaction_period = repo.get_compaction_period();
|
||||
repo.compaction_iteration()?;
|
||||
Ok(ControlFlow::Continue(compaction_period))
|
||||
@@ -140,14 +150,17 @@ pub fn init_tenant_task_pool() -> anyhow::Result<()> {
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = thread_mgr::shutdown_watcher() => {
|
||||
// TODO do this from thread_mgr? Extend it to work with tasks?
|
||||
// Send cancellation to all tasks
|
||||
for (_, cancel) in gc_loops.drain() {
|
||||
cancel.send(()).ok();
|
||||
}
|
||||
for (_, cancel) in compaction_loops.drain() {
|
||||
cancel.send(()).ok();
|
||||
}
|
||||
// TODO wait for tasks to die?
|
||||
|
||||
// Exit after all tasks finish
|
||||
// TODO log any errors
|
||||
while let Some(_) = futures.next().await { }
|
||||
break;
|
||||
},
|
||||
tenantid = gc_recv.recv() => {
|
||||
@@ -212,13 +225,22 @@ async fn gc_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) {
|
||||
|
||||
// Run blocking part of the task
|
||||
let period: Result<Result<_, anyhow::Error>, _> = tokio::task::spawn_blocking(move || {
|
||||
// Break if tenant is not active
|
||||
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
|
||||
return Ok(ControlFlow::Break(()));
|
||||
}
|
||||
|
||||
// Break if we're not allowed to write to disk
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
// TODO do this inside repo.gc_iteration instead.
|
||||
let _guard = match repo.file_lock.try_read() {
|
||||
Ok(g) => g,
|
||||
Err(_) => return Ok(ControlFlow::Break(())),
|
||||
};
|
||||
|
||||
// Run gc
|
||||
let gc_period = repo.get_gc_period();
|
||||
let gc_horizon = repo.get_gc_horizon();
|
||||
|
||||
if gc_horizon > 0 {
|
||||
repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user