mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
Run compaction as task
This commit is contained in:
@@ -330,23 +330,8 @@ pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow:
|
||||
}
|
||||
(TenantState::Idle, TenantState::Active) => {
|
||||
info!("activating tenant {tenant_id}");
|
||||
let compactor_spawn_result = thread_mgr::spawn(
|
||||
ThreadKind::Compactor,
|
||||
Some(tenant_id),
|
||||
None,
|
||||
"Compactor thread",
|
||||
false,
|
||||
move || crate::tenant_threads::compact_loop(tenant_id),
|
||||
);
|
||||
if compactor_spawn_result.is_err() {
|
||||
let mut m = tenants_state::write_tenants();
|
||||
m.get_mut(&tenant_id)
|
||||
.with_context(|| format!("Tenant not found for id {tenant_id}"))?
|
||||
.state = old_state;
|
||||
drop(m);
|
||||
}
|
||||
compactor_spawn_result?;
|
||||
|
||||
crate::tenant_threads::start_compaction_loop(tenant_id)?;
|
||||
crate::tenant_threads::start_gc_loop(tenant_id)?;
|
||||
}
|
||||
(TenantState::Idle, TenantState::Stopping) => {
|
||||
|
||||
@@ -14,8 +14,8 @@ use utils::zid::ZTenantId;
|
||||
///
|
||||
/// Compaction thread's main loop
|
||||
///
|
||||
pub fn compact_loop(tenantid: ZTenantId) -> Result<()> {
|
||||
if let Err(err) = compact_loop_ext(tenantid) {
|
||||
pub async fn compaction_loop(tenantid: ZTenantId) -> Result<()> {
|
||||
if let Err(err) = compaction_loop_ext(tenantid).await {
|
||||
error!("compact loop terminated with error: {:?}", err);
|
||||
Err(err)
|
||||
} else {
|
||||
@@ -23,7 +23,7 @@ pub fn compact_loop(tenantid: ZTenantId) -> Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> {
|
||||
async fn compaction_loop_ext(tenantid: ZTenantId) -> Result<()> {
|
||||
loop {
|
||||
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
|
||||
break;
|
||||
@@ -36,7 +36,19 @@ fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> {
|
||||
|
||||
// Compact timelines
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
repo.compaction_iteration()?;
|
||||
let compaction_result =
|
||||
tokio::task::spawn_blocking(move || repo.compaction_iteration()).await;
|
||||
match compaction_result {
|
||||
Ok(Ok(())) => {
|
||||
// success, do nothing
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
anyhow::bail!(e.context("Compaction failed"));
|
||||
}
|
||||
Err(e) => {
|
||||
anyhow::bail!("Compaction join error {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trace!(
|
||||
@@ -48,6 +60,7 @@ fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> {
|
||||
}
|
||||
|
||||
static START_GC_LOOP: OnceCell<Sender<ZTenantId>> = OnceCell::new();
|
||||
static START_COMPACTION_LOOP: OnceCell<Sender<ZTenantId>> = OnceCell::new();
|
||||
|
||||
pub fn start_gc_loop(tenantid: ZTenantId) -> Result<()> {
|
||||
START_GC_LOOP
|
||||
@@ -58,6 +71,15 @@ pub fn start_gc_loop(tenantid: ZTenantId) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn start_compaction_loop(tenantid: ZTenantId) -> Result<()> {
|
||||
START_COMPACTION_LOOP
|
||||
.get()
|
||||
.unwrap()
|
||||
.blocking_send(tenantid)
|
||||
.unwrap();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn init_tenant_task_pool() -> Result<()> {
|
||||
let runtime = tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("tenant-task-worker")
|
||||
@@ -66,8 +88,11 @@ pub fn init_tenant_task_pool() -> Result<()> {
|
||||
.enable_all()
|
||||
.build()?;
|
||||
|
||||
let (send, mut recv) = mpsc::channel::<ZTenantId>(100);
|
||||
START_GC_LOOP.set(send).unwrap();
|
||||
let (gc_send, mut gc_recv) = mpsc::channel::<ZTenantId>(100);
|
||||
START_GC_LOOP.set(gc_send).unwrap();
|
||||
|
||||
let (compaction_send, mut compaction_recv) = mpsc::channel::<ZTenantId>(100);
|
||||
START_COMPACTION_LOOP.set(compaction_send).unwrap();
|
||||
|
||||
thread_mgr::spawn(
|
||||
ThreadKind::WalReceiverManager, // TODO
|
||||
@@ -80,9 +105,12 @@ pub fn init_tenant_task_pool() -> Result<()> {
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = thread_mgr::shutdown_watcher() => break,
|
||||
tenantid = recv.recv() => {
|
||||
tenantid = gc_recv.recv() => {
|
||||
tokio::spawn(gc_loop(tenantid.unwrap()));
|
||||
},
|
||||
tenantid = compaction_recv.recv() => {
|
||||
tokio::spawn(compaction_loop(tenantid.unwrap()));
|
||||
},
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -119,12 +147,10 @@ pub async fn gc_loop(tenantid: ZTenantId) -> Result<()> {
|
||||
// Gc success, do nothing
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
error!("Gc failed: {}", e);
|
||||
// TODO maybe also don't reschedule on error?
|
||||
anyhow::bail!(e.context("Gc failed"));
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Gc failed: {}", e);
|
||||
// TODO maybe also don't reschedule on error?
|
||||
anyhow::bail!("Gc join error {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user