mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Expand blocking scope
This commit is contained in:
@@ -1,11 +1,13 @@
|
||||
//! This module contains functions to serve per-tenant background processes,
|
||||
//! such as compaction and GC
|
||||
|
||||
use std::ops::ControlFlow;
|
||||
|
||||
use crate::repository::Repository;
|
||||
use crate::tenant_mgr::TenantState;
|
||||
use crate::thread_mgr::ThreadKind;
|
||||
use crate::{tenant_mgr, thread_mgr};
|
||||
use anyhow::Result;
|
||||
use anyhow;
|
||||
use once_cell::sync::OnceCell;
|
||||
use tokio::sync::mpsc::{self, Sender};
|
||||
use tracing::*;
|
||||
@@ -14,7 +16,7 @@ use utils::zid::ZTenantId;
|
||||
///
|
||||
/// Compaction task's main loop
|
||||
///
|
||||
async fn compaction_loop(tenantid: ZTenantId) -> Result<()> {
|
||||
async fn compaction_loop(tenantid: ZTenantId) -> anyhow::Result<()> {
|
||||
if let Err(err) = compaction_loop_ext(tenantid).await {
|
||||
error!("compact loop terminated with error: {:?}", err);
|
||||
Err(err)
|
||||
@@ -23,30 +25,35 @@ async fn compaction_loop(tenantid: ZTenantId) -> Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
async fn compaction_loop_ext(tenantid: ZTenantId) -> Result<()> {
|
||||
async fn compaction_loop_ext(tenantid: ZTenantId) -> anyhow::Result<()> {
|
||||
loop {
|
||||
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
|
||||
break;
|
||||
}
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
let compaction_period = repo.get_compaction_period();
|
||||
|
||||
tokio::time::sleep(compaction_period).await;
|
||||
trace!("compaction loop for tenant {} waking up", tenantid);
|
||||
|
||||
// Compact timelines
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
let compaction_result =
|
||||
tokio::task::spawn_blocking(move || repo.compaction_iteration()).await;
|
||||
match compaction_result {
|
||||
Ok(Ok(())) => {
|
||||
// success, do nothing
|
||||
// Run blocking part of the task
|
||||
let period: Result<Result<_, anyhow::Error>, _> = tokio::task::spawn_blocking(move || {
|
||||
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
|
||||
return Ok(ControlFlow::Break(()));
|
||||
}
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
let compaction_period = repo.get_compaction_period();
|
||||
repo.compaction_iteration()?;
|
||||
Ok(ControlFlow::Continue(compaction_period))
|
||||
})
|
||||
.await;
|
||||
|
||||
// Handle result
|
||||
match period {
|
||||
Ok(Ok(ControlFlow::Continue(period))) => {
|
||||
tokio::time::sleep(period).await;
|
||||
}
|
||||
Ok(Ok(ControlFlow::Break(()))) => {
|
||||
break;
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
anyhow::bail!(e.context("Compaction failed"));
|
||||
anyhow::bail!("Compaction failed: {}", e);
|
||||
}
|
||||
Err(e) => {
|
||||
anyhow::bail!("Compaction join error {}", e);
|
||||
anyhow::bail!("Compaction join error: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -65,30 +72,30 @@ static START_COMPACTION_LOOP: OnceCell<Sender<ZTenantId>> = OnceCell::new();
|
||||
/// Spawn a task that will periodically schedule garbage collection until
|
||||
/// the tenant becomes inactive. This should be called on tenant
|
||||
/// activation.
|
||||
pub fn start_gc_loop(tenantid: ZTenantId) -> Result<()> {
|
||||
pub fn start_gc_loop(tenantid: ZTenantId) -> anyhow::Result<()> {
|
||||
START_GC_LOOP
|
||||
.get()
|
||||
.unwrap()
|
||||
.expect("failed to get START_GC_LOOP")
|
||||
.blocking_send(tenantid)
|
||||
.unwrap();
|
||||
.expect("Failed to send to START_GC_LOOP channel");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Spawn a task that will periodically schedule compaction until
|
||||
/// the tenant becomes inactive. This should be called on tenant
|
||||
/// activation.
|
||||
pub fn start_compaction_loop(tenantid: ZTenantId) -> Result<()> {
|
||||
pub fn start_compaction_loop(tenantid: ZTenantId) -> anyhow::Result<()> {
|
||||
START_COMPACTION_LOOP
|
||||
.get()
|
||||
.unwrap()
|
||||
.expect("failed to get START_COMPACTION_LOOP")
|
||||
.blocking_send(tenantid)
|
||||
.unwrap();
|
||||
.expect("failed to send to START_COMPACTION_LOOP");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Spawn the TenantTaskManager
|
||||
/// This needs to be called before start_gc_loop or start_compaction_loop
|
||||
pub fn init_tenant_task_pool() -> Result<()> {
|
||||
pub fn init_tenant_task_pool() -> anyhow::Result<()> {
|
||||
let runtime = tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("tenant-task-worker")
|
||||
.worker_threads(40) // Way more than necessary
|
||||
@@ -97,28 +104,37 @@ pub fn init_tenant_task_pool() -> Result<()> {
|
||||
.build()?;
|
||||
|
||||
let (gc_send, mut gc_recv) = mpsc::channel::<ZTenantId>(100);
|
||||
START_GC_LOOP.set(gc_send).unwrap();
|
||||
START_GC_LOOP
|
||||
.set(gc_send)
|
||||
.expect("Failed to set START_GC_LOOP");
|
||||
|
||||
let (compaction_send, mut compaction_recv) = mpsc::channel::<ZTenantId>(100);
|
||||
START_COMPACTION_LOOP.set(compaction_send).unwrap();
|
||||
START_COMPACTION_LOOP
|
||||
.set(compaction_send)
|
||||
.expect("Failed to set START_COMPACTION_LOOP");
|
||||
|
||||
thread_mgr::spawn(
|
||||
ThreadKind::TenantTaskManager,
|
||||
None,
|
||||
None,
|
||||
"WAL receiver manager main thread",
|
||||
"Tenant task manager main thread",
|
||||
true,
|
||||
move || {
|
||||
runtime.block_on(async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = thread_mgr::shutdown_watcher() => break,
|
||||
_ = thread_mgr::shutdown_watcher() => {
|
||||
// TODO cancel all running tasks
|
||||
break
|
||||
},
|
||||
// TODO don't spawn if already running
|
||||
tenantid = gc_recv.recv() => {
|
||||
tokio::spawn(gc_loop(tenantid.unwrap()));
|
||||
let tenantid = tenantid.expect("Gc task channel closed unexpectedly");
|
||||
tokio::spawn(gc_loop(tenantid));
|
||||
},
|
||||
tenantid = compaction_recv.recv() => {
|
||||
tokio::spawn(compaction_loop(tenantid.unwrap()));
|
||||
let tenantid = tenantid.expect("Compaction task channel closed unexpectedly");
|
||||
tokio::spawn(compaction_loop(tenantid));
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -133,38 +149,42 @@ pub fn init_tenant_task_pool() -> Result<()> {
|
||||
///
|
||||
/// GC thread's main loop
|
||||
///
|
||||
async fn gc_loop(tenantid: ZTenantId) -> Result<()> {
|
||||
async fn gc_loop(tenantid: ZTenantId) -> anyhow::Result<()> {
|
||||
loop {
|
||||
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
|
||||
break;
|
||||
}
|
||||
|
||||
trace!("gc loop for tenant {} waking up", tenantid);
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
let gc_period = repo.get_gc_period();
|
||||
let gc_horizon = repo.get_gc_horizon();
|
||||
|
||||
// Garbage collect old files that are not needed for PITR anymore
|
||||
if gc_horizon > 0 {
|
||||
let gc_result = tokio::task::spawn_blocking(move || {
|
||||
repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)
|
||||
})
|
||||
.await;
|
||||
// Run blocking part of the task
|
||||
let period: Result<Result<_, anyhow::Error>, _> = tokio::task::spawn_blocking(move || {
|
||||
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
|
||||
return Ok(ControlFlow::Break(()));
|
||||
}
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
let gc_period = repo.get_gc_period();
|
||||
let gc_horizon = repo.get_gc_horizon();
|
||||
|
||||
match gc_result {
|
||||
Ok(Ok(_gc_result)) => {
|
||||
// Gc success, do nothing
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
anyhow::bail!(e.context("Gc failed"));
|
||||
}
|
||||
Err(e) => {
|
||||
anyhow::bail!("Gc join error {}", e);
|
||||
}
|
||||
if gc_horizon > 0 {
|
||||
repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?;
|
||||
}
|
||||
|
||||
Ok(ControlFlow::Continue(gc_period))
|
||||
})
|
||||
.await;
|
||||
|
||||
// Handle result
|
||||
match period {
|
||||
Ok(Ok(ControlFlow::Continue(period))) => {
|
||||
tokio::time::sleep(period).await;
|
||||
}
|
||||
Ok(Ok(ControlFlow::Break(()))) => {
|
||||
break;
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
anyhow::bail!("Gc failed: {}", e);
|
||||
}
|
||||
Err(e) => {
|
||||
anyhow::bail!("Gc join error: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
tokio::time::sleep(gc_period).await;
|
||||
}
|
||||
trace!(
|
||||
"GC loop stopped for tenant {} state is {:?}",
|
||||
|
||||
Reference in New Issue
Block a user