Gc/compaction thread pool, take 2 (#1933)

Decrease the number of pageserver threads by running gc and compaction in a blocking tokio thread pool
This commit is contained in:
bojanserafimov
2022-07-05 02:06:40 -04:00
committed by GitHub
parent 6abdb12724
commit d29c545b5d
9 changed files with 386 additions and 127 deletions

View File

@@ -263,6 +263,8 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
// start profiler (if enabled)
let profiler_guard = profiling::init_profiler(conf);
pageserver::tenant_tasks::init_tenant_task_pool()?;
// initialize authentication for incoming connections
let auth = match &conf.auth_type {
AuthType::Trust | AuthType::MD5 => None,

View File

@@ -158,6 +158,18 @@ pub struct LayeredRepository {
// Global pageserver config parameters
pub conf: &'static PageServerConf,
// Allows us to gracefully cancel operations that edit the directory
// that backs this layered repository. Usage:
//
// Use `let _guard = file_lock.try_read()` while writing any files.
// Use `let _guard = file_lock.write().unwrap()` to wait for all writes to finish.
//
// TODO try_read this lock during checkpoint as well to prevent race
// between checkpoint and detach/delete.
// TODO try_read this lock for all gc/compaction operations, not just
// ones scheduled by the tenant task manager.
pub file_lock: RwLock<()>,
// Overridden tenant-specific config parameters.
// We keep TenantConfOpt sturct here to preserve the information
// about parameters that are not set.
@@ -685,6 +697,7 @@ impl LayeredRepository {
) -> LayeredRepository {
LayeredRepository {
tenant_id,
file_lock: RwLock::new(()),
conf,
tenant_conf: Arc::new(RwLock::new(tenant_conf)),
timelines: Mutex::new(HashMap::new()),

View File

@@ -13,7 +13,7 @@ pub mod repository;
pub mod storage_sync;
pub mod tenant_config;
pub mod tenant_mgr;
pub mod tenant_threads;
pub mod tenant_tasks;
pub mod thread_mgr;
pub mod timelines;
pub mod virtual_file;

View File

@@ -230,8 +230,6 @@ pub fn shutdown_all_tenants() {
drop(m);
thread_mgr::shutdown_threads(Some(ThreadKind::WalReceiverManager), None, None);
thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), None, None);
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), None, None);
// Ok, no background threads running anymore. Flush any remaining data in
// memory to disk.
@@ -330,44 +328,12 @@ pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow:
}
(TenantState::Idle, TenantState::Active) => {
info!("activating tenant {tenant_id}");
let compactor_spawn_result = thread_mgr::spawn(
ThreadKind::Compactor,
Some(tenant_id),
None,
"Compactor thread",
false,
move || crate::tenant_threads::compact_loop(tenant_id),
);
if compactor_spawn_result.is_err() {
let mut m = tenants_state::write_tenants();
m.get_mut(&tenant_id)
.with_context(|| format!("Tenant not found for id {tenant_id}"))?
.state = old_state;
drop(m);
}
compactor_spawn_result?;
let gc_spawn_result = thread_mgr::spawn(
ThreadKind::GarbageCollector,
Some(tenant_id),
None,
"GC thread",
false,
move || crate::tenant_threads::gc_loop(tenant_id),
)
.map(|_thread_id| ()) // update the `Result::Ok` type to match the outer function's return signature
.with_context(|| format!("Failed to launch GC thread for tenant {tenant_id}"));
if let Err(e) = &gc_spawn_result {
let mut m = tenants_state::write_tenants();
m.get_mut(&tenant_id)
.with_context(|| format!("Tenant not found for id {tenant_id}"))?
.state = old_state;
drop(m);
error!("Failed to start GC thread for tenant {tenant_id}, stopping its checkpointer thread: {e:?}");
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
return gc_spawn_result;
}
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
// TODO maybe use tokio::sync::watch instead?
crate::tenant_tasks::start_compaction_loop(tenant_id)?;
crate::tenant_tasks::start_gc_loop(tenant_id)?;
}
(TenantState::Idle, TenantState::Stopping) => {
info!("stopping idle tenant {tenant_id}");
@@ -379,8 +345,10 @@ pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow:
Some(tenant_id),
None,
);
thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), Some(tenant_id), None);
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
// Wait until all gc/compaction tasks finish
let repo = get_repository_for_tenant(tenant_id)?;
let _guard = repo.file_lock.write().unwrap();
}
}

View File

@@ -0,0 +1,288 @@
//! This module contains functions to serve per-tenant background processes,
//! such as compaction and GC
use std::collections::HashMap;
use std::ops::ControlFlow;
use std::time::Duration;
use crate::repository::Repository;
use crate::tenant_mgr::TenantState;
use crate::thread_mgr::ThreadKind;
use crate::{tenant_mgr, thread_mgr};
use anyhow::{self, Context};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use metrics::{register_int_counter_vec, IntCounterVec};
use once_cell::sync::{Lazy, OnceCell};
use tokio::sync::mpsc;
use tokio::sync::watch;
use tracing::*;
use utils::zid::ZTenantId;
static TENANT_TASK_EVENTS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_tenant_task_events",
"Number of task start/stop/fail events.",
&["event"],
)
.expect("Failed to register tenant_task_events metric")
});
///
/// Compaction task's main loop
///
async fn compaction_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) {
loop {
trace!("waking up");
// Run blocking part of the task
let period: Result<Result<_, anyhow::Error>, _> = tokio::task::spawn_blocking(move || {
// Break if tenant is not active
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
return Ok(ControlFlow::Break(()));
}
// Break if we're not allowed to write to disk
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
// TODO do this inside repo.compaction_iteration instead.
let _guard = match repo.file_lock.try_read() {
Ok(g) => g,
Err(_) => return Ok(ControlFlow::Break(())),
};
// Run compaction
let compaction_period = repo.get_compaction_period();
repo.compaction_iteration()?;
Ok(ControlFlow::Continue(compaction_period))
})
.await;
// Decide whether to sleep or break
let sleep_duration = match period {
Ok(Ok(ControlFlow::Continue(period))) => period,
Ok(Ok(ControlFlow::Break(()))) => break,
Ok(Err(e)) => {
error!("Compaction failed, retrying: {}", e);
Duration::from_secs(2)
}
Err(e) => {
error!("Compaction join error, retrying: {}", e);
Duration::from_secs(2)
}
};
// Sleep
tokio::select! {
_ = cancel.changed() => {
trace!("received cancellation request");
break;
},
_ = tokio::time::sleep(sleep_duration) => {},
}
}
trace!(
"compaction loop stopped. State is {:?}",
tenant_mgr::get_tenant_state(tenantid)
);
}
static START_GC_LOOP: OnceCell<mpsc::Sender<ZTenantId>> = OnceCell::new();
static START_COMPACTION_LOOP: OnceCell<mpsc::Sender<ZTenantId>> = OnceCell::new();
/// Spawn a task that will periodically schedule garbage collection until
/// the tenant becomes inactive. This should be called on tenant
/// activation.
pub fn start_gc_loop(tenantid: ZTenantId) -> anyhow::Result<()> {
START_GC_LOOP
.get()
.context("Failed to get START_GC_LOOP")?
.blocking_send(tenantid)
.context("Failed to send to START_GC_LOOP channel")?;
Ok(())
}
/// Spawn a task that will periodically schedule compaction until
/// the tenant becomes inactive. This should be called on tenant
/// activation.
pub fn start_compaction_loop(tenantid: ZTenantId) -> anyhow::Result<()> {
START_COMPACTION_LOOP
.get()
.context("failed to get START_COMPACTION_LOOP")?
.blocking_send(tenantid)
.context("failed to send to START_COMPACTION_LOOP")?;
Ok(())
}
/// Spawn the TenantTaskManager
/// This needs to be called before start_gc_loop or start_compaction_loop
pub fn init_tenant_task_pool() -> anyhow::Result<()> {
let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name("tenant-task-worker")
.worker_threads(40) // Way more than necessary
.max_blocking_threads(100) // Way more than necessary
.enable_all()
.build()?;
let (gc_send, mut gc_recv) = mpsc::channel::<ZTenantId>(100);
START_GC_LOOP
.set(gc_send)
.expect("Failed to set START_GC_LOOP");
let (compaction_send, mut compaction_recv) = mpsc::channel::<ZTenantId>(100);
START_COMPACTION_LOOP
.set(compaction_send)
.expect("Failed to set START_COMPACTION_LOOP");
// TODO this is getting repetitive
let mut gc_loops = HashMap::<ZTenantId, watch::Sender<()>>::new();
let mut compaction_loops = HashMap::<ZTenantId, watch::Sender<()>>::new();
thread_mgr::spawn(
ThreadKind::TenantTaskManager,
None,
None,
"Tenant task manager main thread",
true,
move || {
runtime.block_on(async move {
let mut futures = FuturesUnordered::new();
loop {
tokio::select! {
_ = thread_mgr::shutdown_watcher() => {
// Send cancellation to all tasks
for (_, cancel) in gc_loops.drain() {
cancel.send(()).ok();
}
for (_, cancel) in compaction_loops.drain() {
cancel.send(()).ok();
}
// Exit after all tasks finish
while let Some(result) = futures.next().await {
match result {
Ok(()) => {
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
},
Err(e) => {
TENANT_TASK_EVENTS.with_label_values(&["panic"]).inc();
error!("loop join error {}", e)
},
}
}
break;
},
tenantid = gc_recv.recv() => {
let tenantid = tenantid.expect("Gc task channel closed unexpectedly");
// Spawn new task, request cancellation of the old one if exists
let (cancel_send, cancel_recv) = watch::channel(());
let handle = tokio::spawn(gc_loop(tenantid, cancel_recv)
.instrument(info_span!("gc loop", tenant = %tenantid)));
if let Some(old_cancel_send) = gc_loops.insert(tenantid, cancel_send) {
old_cancel_send.send(()).ok();
}
// Update metrics, remember handle
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
futures.push(handle);
},
tenantid = compaction_recv.recv() => {
let tenantid = tenantid.expect("Compaction task channel closed unexpectedly");
// Spawn new task, request cancellation of the old one if exists
let (cancel_send, cancel_recv) = watch::channel(());
let handle = tokio::spawn(compaction_loop(tenantid, cancel_recv)
.instrument(info_span!("compaction loop", tenant = %tenantid)));
if let Some(old_cancel_send) = compaction_loops.insert(tenantid, cancel_send) {
old_cancel_send.send(()).ok();
}
// Update metrics, remember handle
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
futures.push(handle);
},
result = futures.next() => {
// Log and count any unhandled panics
match result {
Some(Ok(())) => {
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
},
Some(Err(e)) => {
TENANT_TASK_EVENTS.with_label_values(&["panic"]).inc();
error!("loop join error {}", e)
},
None => {},
};
},
}
}
});
Ok(())
},
)?;
Ok(())
}
///
/// GC task's main loop
///
async fn gc_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) {
loop {
trace!("waking up");
// Run blocking part of the task
let period: Result<Result<_, anyhow::Error>, _> = tokio::task::spawn_blocking(move || {
// Break if tenant is not active
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
return Ok(ControlFlow::Break(()));
}
// Break if we're not allowed to write to disk
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
// TODO do this inside repo.gc_iteration instead.
let _guard = match repo.file_lock.try_read() {
Ok(g) => g,
Err(_) => return Ok(ControlFlow::Break(())),
};
// Run gc
let gc_period = repo.get_gc_period();
let gc_horizon = repo.get_gc_horizon();
if gc_horizon > 0 {
repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?;
}
Ok(ControlFlow::Continue(gc_period))
})
.await;
// Decide whether to sleep or break
let sleep_duration = match period {
Ok(Ok(ControlFlow::Continue(period))) => period,
Ok(Ok(ControlFlow::Break(()))) => break,
Ok(Err(e)) => {
error!("Gc failed, retrying: {}", e);
Duration::from_secs(2)
}
Err(e) => {
error!("Gc join error, retrying: {}", e);
Duration::from_secs(2)
}
};
// Sleep
tokio::select! {
_ = cancel.changed() => {
trace!("received cancellation request");
break;
},
_ = tokio::time::sleep(sleep_duration) => {},
}
}
trace!(
"GC loop stopped. State is {:?}",
tenant_mgr::get_tenant_state(tenantid)
);
}

View File

@@ -1,79 +0,0 @@
//! This module contains functions to serve per-tenant background processes,
//! such as compaction and GC
use crate::repository::Repository;
use crate::tenant_mgr;
use crate::tenant_mgr::TenantState;
use anyhow::Result;
use std::time::Duration;
use tracing::*;
use utils::zid::ZTenantId;
///
/// Compaction thread's main loop
///
pub fn compact_loop(tenantid: ZTenantId) -> Result<()> {
if let Err(err) = compact_loop_ext(tenantid) {
error!("compact loop terminated with error: {:?}", err);
Err(err)
} else {
Ok(())
}
}
fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> {
loop {
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
break;
}
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let compaction_period = repo.get_compaction_period();
std::thread::sleep(compaction_period);
trace!("compaction thread for tenant {} waking up", tenantid);
// Compact timelines
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
repo.compaction_iteration()?;
}
trace!(
"compaction thread stopped for tenant {} state is {:?}",
tenantid,
tenant_mgr::get_tenant_state(tenantid)
);
Ok(())
}
///
/// GC thread's main loop
///
pub fn gc_loop(tenantid: ZTenantId) -> Result<()> {
loop {
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
break;
}
trace!("gc thread for tenant {} waking up", tenantid);
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let gc_horizon = repo.get_gc_horizon();
// Garbage collect old files that are not needed for PITR anymore
if gc_horizon > 0 {
repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?;
}
// TODO Write it in more adequate way using
// condvar.wait_timeout() or something
let mut sleep_time = repo.get_gc_period().as_secs();
while sleep_time > 0 && tenant_mgr::get_tenant_state(tenantid) == Some(TenantState::Active)
{
sleep_time -= 1;
std::thread::sleep(Duration::from_secs(1));
}
}
trace!(
"GC thread stopped for tenant {} state is {:?}",
tenantid,
tenant_mgr::get_tenant_state(tenantid)
);
Ok(())
}

View File

@@ -94,11 +94,8 @@ pub enum ThreadKind {
// Main walreceiver manager thread that ensures that every timeline spawns a connection to safekeeper, to fetch WAL.
WalReceiverManager,
// Thread that handles compaction of all timelines for a tenant.
Compactor,
// Thread that handles GC of a tenant
GarbageCollector,
// Thread that schedules new compaction and gc jobs
TenantTaskManager,
// Thread that flushes frozen in-memory layers to disk
LayerFlushThread,

View File

@@ -0,0 +1,70 @@
from fixtures.neon_fixtures import NeonEnvBuilder, wait_until
from uuid import UUID
import time
def get_only_element(l):
assert len(l) == 1
return l[0]
# Test that gc and compaction tenant tasks start and stop correctly
def test_tenant_tasks(neon_env_builder: NeonEnvBuilder):
# The gc and compaction loops don't bother to watch for tenant state
# changes while sleeping, so we use small periods to make this test
# run faster. With default settings we'd have to wait longer for tasks
# to notice state changes and shut down.
# TODO fix this behavior in the pageserver
tenant_config = "{gc_period = '1 s', compaction_period = '1 s'}"
neon_env_builder.pageserver_config_override = f"tenant_config={tenant_config}"
name = "test_tenant_tasks"
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
def get_state(tenant):
all_states = client.tenant_list()
matching = [t for t in all_states if t["id"] == tenant.hex]
return get_only_element(matching)["state"]
def get_metric_value(name):
metrics = client.get_metrics()
relevant = [line for line in metrics.splitlines() if line.startswith(name)]
if len(relevant) == 0:
return 0
line = get_only_element(relevant)
value = line.lstrip(name).strip()
return int(value)
def detach_all_timelines(tenant):
timelines = [UUID(t["timeline_id"]) for t in client.timeline_list(tenant)]
for t in timelines:
client.timeline_detach(tenant, t)
def assert_idle(tenant):
assert get_state(tenant) == "Idle"
# Create tenant, start compute
tenant, _ = env.neon_cli.create_tenant()
timeline = env.neon_cli.create_timeline(name, tenant_id=tenant)
pg = env.postgres.create_start(name, tenant_id=tenant)
assert (get_state(tenant) == "Active")
# Stop compute
pg.stop()
# Detach all tenants and wait for them to go idle
# TODO they should be already idle since there are no active computes
for tenant_info in client.tenant_list():
tenant_id = UUID(tenant_info["id"])
detach_all_timelines(tenant_id)
wait_until(10, 0.2, lambda: assert_idle(tenant_id))
# Assert that all tasks finish quickly after tenants go idle
def assert_tasks_finish():
tasks_started = get_metric_value('pageserver_tenant_task_events{event="start"}')
tasks_ended = get_metric_value('pageserver_tenant_task_events{event="stop"}')
tasks_panicked = get_metric_value('pageserver_tenant_task_events{event="panic"}')
assert tasks_started == tasks_ended
assert tasks_panicked == 0
wait_until(10, 0.2, assert_tasks_finish)

View File

@@ -2143,7 +2143,7 @@ def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, pg: Post
assert (mismatch, error) == ([], [])
def wait_until(number_of_iterations: int, interval: int, func):
def wait_until(number_of_iterations: int, interval: float, func):
"""
Wait until 'func' returns successfully, without exception. Returns the last return value
from the the function.