Add runtime

This commit is contained in:
Bojan Serafimov
2022-06-14 11:08:17 -04:00
parent a1f85715ac
commit c79e72e835
3 changed files with 67 additions and 27 deletions

View File

@@ -276,6 +276,8 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
let remote_index = tenant_mgr::init_tenant_mgr(conf)?;
pageserver::tenant_threads::init_tenant_task_pool()?;
// Spawn a new thread for the http endpoint
// bind before launching separate thread so the error reported before startup exits
let auth_cloned = auth.clone();

View File

@@ -347,27 +347,7 @@ pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow:
}
compactor_spawn_result?;
let gc_spawn_result = thread_mgr::spawn(
ThreadKind::GarbageCollector,
Some(tenant_id),
None,
"GC thread",
false,
move || crate::tenant_threads::gc_loop(tenant_id),
)
.map(|_thread_id| ()) // update the `Result::Ok` type to match the outer function's return signature
.with_context(|| format!("Failed to launch GC thread for tenant {tenant_id}"));
if let Err(e) = &gc_spawn_result {
let mut m = tenants_state::write_tenants();
m.get_mut(&tenant_id)
.with_context(|| format!("Tenant not found for id {tenant_id}"))?
.state = old_state;
drop(m);
error!("Failed to start GC thread for tenant {tenant_id}, stopping its checkpointer thread: {e:?}");
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
return gc_spawn_result;
}
crate::tenant_threads::start_gc_loop(tenant_id)?;
}
(TenantState::Idle, TenantState::Stopping) => {
info!("stopping idle tenant {tenant_id}");

View File

@@ -1,11 +1,16 @@
//! This module contains functions to serve per-tenant background processes,
//! such as compaction and GC
use std::str::FromStr;
use crate::repository::Repository;
use crate::tenant_mgr;
use crate::tenant_mgr::TenantState;
use crate::thread_mgr::ThreadKind;
use crate::{tenant_mgr, thread_mgr};
use anyhow::Result;
use tokio::task::JoinError;
use std::time::Duration;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use once_cell::sync::OnceCell;
use tokio::sync::mpsc::{self, Sender};
use tracing::*;
use utils::zid::ZTenantId;
@@ -45,6 +50,58 @@ fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> {
Ok(())
}
static START_GC_LOOP: OnceCell<Sender<ZTenantId>> = OnceCell::new();
pub fn start_gc_loop(tenantid: ZTenantId) -> Result<()> {
START_GC_LOOP
.get()
.unwrap()
.blocking_send(tenantid)
.unwrap();
Ok(())
}
pub fn init_tenant_task_pool() -> Result<()> {
let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name("tenant-task-worker")
.worker_threads(40)
.max_blocking_threads(100)
.enable_all()
.build()?;
let (send, mut recv) = mpsc::channel::<ZTenantId>(100);
START_GC_LOOP.set(send).unwrap();
thread_mgr::spawn(
ThreadKind::WalReceiverManager, // TODO
None,
None,
"WAL receiver manager main thread",
true,
move || {
runtime.block_on(async move {
let mut futures = FuturesUnordered::new();
loop {
tokio::select! {
_ = thread_mgr::shutdown_watcher() => break,
tenantid = recv.recv() => {
let tenantid = tenantid.unwrap();
// Await concurrently with all other futures
futures.push(tokio::spawn(gc_loop(tenantid)));
},
_ = futures.next() => {
// Do nothing. We're waiting on them just to make progress
},
}
}
});
Ok(())
},
)?;
Ok(())
}
///
/// GC thread's main loop
///
@@ -63,10 +120,11 @@ pub async fn gc_loop(tenantid: ZTenantId) -> Result<()> {
if gc_horizon > 0 {
let gc_result = tokio::task::spawn_blocking(move || {
repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)
}).await;
})
.await;
match gc_result {
Ok(Ok(gc_result)) => {
Ok(Ok(_gc_result)) => {
// Gc success, do nothing
}
Ok(Err(e)) => {
@@ -80,7 +138,7 @@ pub async fn gc_loop(tenantid: ZTenantId) -> Result<()> {
}
}
tokio::time::sleep(gc_period);
tokio::time::sleep(gc_period).await;
}
trace!(
"GC loop stopped for tenant {} state is {:?}",