From d29c545b5d214413e5fc1199b2aece650561cc44 Mon Sep 17 00:00:00 2001 From: bojanserafimov Date: Tue, 5 Jul 2022 02:06:40 -0400 Subject: [PATCH] Gc/compaction thread pool, take 2 (#1933) Decrease the number of pageserver threads by running gc and compaction in a blocking tokio thread pool --- pageserver/src/bin/pageserver.rs | 2 + pageserver/src/layered_repository.rs | 13 + pageserver/src/lib.rs | 2 +- pageserver/src/tenant_mgr.rs | 50 +-- pageserver/src/tenant_tasks.rs | 288 ++++++++++++++++++ pageserver/src/tenant_threads.rs | 79 ----- pageserver/src/thread_mgr.rs | 7 +- test_runner/batch_others/test_tenant_tasks.py | 70 +++++ test_runner/fixtures/neon_fixtures.py | 2 +- 9 files changed, 386 insertions(+), 127 deletions(-) create mode 100644 pageserver/src/tenant_tasks.rs delete mode 100644 pageserver/src/tenant_threads.rs create mode 100644 test_runner/batch_others/test_tenant_tasks.py diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 1d407a29bc..b539964414 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -263,6 +263,8 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<() // start profiler (if enabled) let profiler_guard = profiling::init_profiler(conf); + pageserver::tenant_tasks::init_tenant_task_pool()?; + // initialize authentication for incoming connections let auth = match &conf.auth_type { AuthType::Trust | AuthType::MD5 => None, diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index af6c7ba9fd..67f024ef59 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -158,6 +158,18 @@ pub struct LayeredRepository { // Global pageserver config parameters pub conf: &'static PageServerConf, + // Allows us to gracefully cancel operations that edit the directory + // that backs this layered repository. Usage: + // + // Use `let _guard = file_lock.try_read()` while writing any files. + // Use `let _guard = file_lock.write().unwrap()` to wait for all writes to finish. + // + // TODO try_read this lock during checkpoint as well to prevent race + // between checkpoint and detach/delete. + // TODO try_read this lock for all gc/compaction operations, not just + // ones scheduled by the tenant task manager. + pub file_lock: RwLock<()>, + // Overridden tenant-specific config parameters. // We keep TenantConfOpt sturct here to preserve the information // about parameters that are not set. @@ -685,6 +697,7 @@ impl LayeredRepository { ) -> LayeredRepository { LayeredRepository { tenant_id, + file_lock: RwLock::new(()), conf, tenant_conf: Arc::new(RwLock::new(tenant_conf)), timelines: Mutex::new(HashMap::new()), diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index a68c277114..c9c00d75e2 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -13,7 +13,7 @@ pub mod repository; pub mod storage_sync; pub mod tenant_config; pub mod tenant_mgr; -pub mod tenant_threads; +pub mod tenant_tasks; pub mod thread_mgr; pub mod timelines; pub mod virtual_file; diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index c48b021d1f..c73fed140a 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -230,8 +230,6 @@ pub fn shutdown_all_tenants() { drop(m); thread_mgr::shutdown_threads(Some(ThreadKind::WalReceiverManager), None, None); - thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), None, None); - thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), None, None); // Ok, no background threads running anymore. Flush any remaining data in // memory to disk. @@ -330,44 +328,12 @@ pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow: } (TenantState::Idle, TenantState::Active) => { info!("activating tenant {tenant_id}"); - let compactor_spawn_result = thread_mgr::spawn( - ThreadKind::Compactor, - Some(tenant_id), - None, - "Compactor thread", - false, - move || crate::tenant_threads::compact_loop(tenant_id), - ); - if compactor_spawn_result.is_err() { - let mut m = tenants_state::write_tenants(); - m.get_mut(&tenant_id) - .with_context(|| format!("Tenant not found for id {tenant_id}"))? - .state = old_state; - drop(m); - } - compactor_spawn_result?; - let gc_spawn_result = thread_mgr::spawn( - ThreadKind::GarbageCollector, - Some(tenant_id), - None, - "GC thread", - false, - move || crate::tenant_threads::gc_loop(tenant_id), - ) - .map(|_thread_id| ()) // update the `Result::Ok` type to match the outer function's return signature - .with_context(|| format!("Failed to launch GC thread for tenant {tenant_id}")); - - if let Err(e) = &gc_spawn_result { - let mut m = tenants_state::write_tenants(); - m.get_mut(&tenant_id) - .with_context(|| format!("Tenant not found for id {tenant_id}"))? - .state = old_state; - drop(m); - error!("Failed to start GC thread for tenant {tenant_id}, stopping its checkpointer thread: {e:?}"); - thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None); - return gc_spawn_result; - } + // Spawn gc and compaction loops. The loops will shut themselves + // down when they notice that the tenant is inactive. + // TODO maybe use tokio::sync::watch instead? + crate::tenant_tasks::start_compaction_loop(tenant_id)?; + crate::tenant_tasks::start_gc_loop(tenant_id)?; } (TenantState::Idle, TenantState::Stopping) => { info!("stopping idle tenant {tenant_id}"); @@ -379,8 +345,10 @@ pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow: Some(tenant_id), None, ); - thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), Some(tenant_id), None); - thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None); + + // Wait until all gc/compaction tasks finish + let repo = get_repository_for_tenant(tenant_id)?; + let _guard = repo.file_lock.write().unwrap(); } } diff --git a/pageserver/src/tenant_tasks.rs b/pageserver/src/tenant_tasks.rs new file mode 100644 index 0000000000..6871ac3001 --- /dev/null +++ b/pageserver/src/tenant_tasks.rs @@ -0,0 +1,288 @@ +//! This module contains functions to serve per-tenant background processes, +//! such as compaction and GC + +use std::collections::HashMap; +use std::ops::ControlFlow; +use std::time::Duration; + +use crate::repository::Repository; +use crate::tenant_mgr::TenantState; +use crate::thread_mgr::ThreadKind; +use crate::{tenant_mgr, thread_mgr}; +use anyhow::{self, Context}; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use metrics::{register_int_counter_vec, IntCounterVec}; +use once_cell::sync::{Lazy, OnceCell}; +use tokio::sync::mpsc; +use tokio::sync::watch; +use tracing::*; +use utils::zid::ZTenantId; + +static TENANT_TASK_EVENTS: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "pageserver_tenant_task_events", + "Number of task start/stop/fail events.", + &["event"], + ) + .expect("Failed to register tenant_task_events metric") +}); + +/// +/// Compaction task's main loop +/// +async fn compaction_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) { + loop { + trace!("waking up"); + + // Run blocking part of the task + let period: Result, _> = tokio::task::spawn_blocking(move || { + // Break if tenant is not active + if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) { + return Ok(ControlFlow::Break(())); + } + + // Break if we're not allowed to write to disk + let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; + // TODO do this inside repo.compaction_iteration instead. + let _guard = match repo.file_lock.try_read() { + Ok(g) => g, + Err(_) => return Ok(ControlFlow::Break(())), + }; + + // Run compaction + let compaction_period = repo.get_compaction_period(); + repo.compaction_iteration()?; + Ok(ControlFlow::Continue(compaction_period)) + }) + .await; + + // Decide whether to sleep or break + let sleep_duration = match period { + Ok(Ok(ControlFlow::Continue(period))) => period, + Ok(Ok(ControlFlow::Break(()))) => break, + Ok(Err(e)) => { + error!("Compaction failed, retrying: {}", e); + Duration::from_secs(2) + } + Err(e) => { + error!("Compaction join error, retrying: {}", e); + Duration::from_secs(2) + } + }; + + // Sleep + tokio::select! { + _ = cancel.changed() => { + trace!("received cancellation request"); + break; + }, + _ = tokio::time::sleep(sleep_duration) => {}, + } + } + + trace!( + "compaction loop stopped. State is {:?}", + tenant_mgr::get_tenant_state(tenantid) + ); +} + +static START_GC_LOOP: OnceCell> = OnceCell::new(); +static START_COMPACTION_LOOP: OnceCell> = OnceCell::new(); + +/// Spawn a task that will periodically schedule garbage collection until +/// the tenant becomes inactive. This should be called on tenant +/// activation. +pub fn start_gc_loop(tenantid: ZTenantId) -> anyhow::Result<()> { + START_GC_LOOP + .get() + .context("Failed to get START_GC_LOOP")? + .blocking_send(tenantid) + .context("Failed to send to START_GC_LOOP channel")?; + Ok(()) +} + +/// Spawn a task that will periodically schedule compaction until +/// the tenant becomes inactive. This should be called on tenant +/// activation. +pub fn start_compaction_loop(tenantid: ZTenantId) -> anyhow::Result<()> { + START_COMPACTION_LOOP + .get() + .context("failed to get START_COMPACTION_LOOP")? + .blocking_send(tenantid) + .context("failed to send to START_COMPACTION_LOOP")?; + Ok(()) +} + +/// Spawn the TenantTaskManager +/// This needs to be called before start_gc_loop or start_compaction_loop +pub fn init_tenant_task_pool() -> anyhow::Result<()> { + let runtime = tokio::runtime::Builder::new_multi_thread() + .thread_name("tenant-task-worker") + .worker_threads(40) // Way more than necessary + .max_blocking_threads(100) // Way more than necessary + .enable_all() + .build()?; + + let (gc_send, mut gc_recv) = mpsc::channel::(100); + START_GC_LOOP + .set(gc_send) + .expect("Failed to set START_GC_LOOP"); + + let (compaction_send, mut compaction_recv) = mpsc::channel::(100); + START_COMPACTION_LOOP + .set(compaction_send) + .expect("Failed to set START_COMPACTION_LOOP"); + + // TODO this is getting repetitive + let mut gc_loops = HashMap::>::new(); + let mut compaction_loops = HashMap::>::new(); + + thread_mgr::spawn( + ThreadKind::TenantTaskManager, + None, + None, + "Tenant task manager main thread", + true, + move || { + runtime.block_on(async move { + let mut futures = FuturesUnordered::new(); + loop { + tokio::select! { + _ = thread_mgr::shutdown_watcher() => { + // Send cancellation to all tasks + for (_, cancel) in gc_loops.drain() { + cancel.send(()).ok(); + } + for (_, cancel) in compaction_loops.drain() { + cancel.send(()).ok(); + } + + // Exit after all tasks finish + while let Some(result) = futures.next().await { + match result { + Ok(()) => { + TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc(); + }, + Err(e) => { + TENANT_TASK_EVENTS.with_label_values(&["panic"]).inc(); + error!("loop join error {}", e) + }, + } + } + break; + }, + tenantid = gc_recv.recv() => { + let tenantid = tenantid.expect("Gc task channel closed unexpectedly"); + + // Spawn new task, request cancellation of the old one if exists + let (cancel_send, cancel_recv) = watch::channel(()); + let handle = tokio::spawn(gc_loop(tenantid, cancel_recv) + .instrument(info_span!("gc loop", tenant = %tenantid))); + if let Some(old_cancel_send) = gc_loops.insert(tenantid, cancel_send) { + old_cancel_send.send(()).ok(); + } + + // Update metrics, remember handle + TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); + futures.push(handle); + }, + tenantid = compaction_recv.recv() => { + let tenantid = tenantid.expect("Compaction task channel closed unexpectedly"); + + // Spawn new task, request cancellation of the old one if exists + let (cancel_send, cancel_recv) = watch::channel(()); + let handle = tokio::spawn(compaction_loop(tenantid, cancel_recv) + .instrument(info_span!("compaction loop", tenant = %tenantid))); + if let Some(old_cancel_send) = compaction_loops.insert(tenantid, cancel_send) { + old_cancel_send.send(()).ok(); + } + + // Update metrics, remember handle + TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); + futures.push(handle); + }, + result = futures.next() => { + // Log and count any unhandled panics + match result { + Some(Ok(())) => { + TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc(); + }, + Some(Err(e)) => { + TENANT_TASK_EVENTS.with_label_values(&["panic"]).inc(); + error!("loop join error {}", e) + }, + None => {}, + }; + }, + } + } + }); + Ok(()) + }, + )?; + + Ok(()) +} + +/// +/// GC task's main loop +/// +async fn gc_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) { + loop { + trace!("waking up"); + + // Run blocking part of the task + let period: Result, _> = tokio::task::spawn_blocking(move || { + // Break if tenant is not active + if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) { + return Ok(ControlFlow::Break(())); + } + + // Break if we're not allowed to write to disk + let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; + // TODO do this inside repo.gc_iteration instead. + let _guard = match repo.file_lock.try_read() { + Ok(g) => g, + Err(_) => return Ok(ControlFlow::Break(())), + }; + + // Run gc + let gc_period = repo.get_gc_period(); + let gc_horizon = repo.get_gc_horizon(); + if gc_horizon > 0 { + repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?; + } + + Ok(ControlFlow::Continue(gc_period)) + }) + .await; + + // Decide whether to sleep or break + let sleep_duration = match period { + Ok(Ok(ControlFlow::Continue(period))) => period, + Ok(Ok(ControlFlow::Break(()))) => break, + Ok(Err(e)) => { + error!("Gc failed, retrying: {}", e); + Duration::from_secs(2) + } + Err(e) => { + error!("Gc join error, retrying: {}", e); + Duration::from_secs(2) + } + }; + + // Sleep + tokio::select! { + _ = cancel.changed() => { + trace!("received cancellation request"); + break; + }, + _ = tokio::time::sleep(sleep_duration) => {}, + } + } + trace!( + "GC loop stopped. State is {:?}", + tenant_mgr::get_tenant_state(tenantid) + ); +} diff --git a/pageserver/src/tenant_threads.rs b/pageserver/src/tenant_threads.rs deleted file mode 100644 index b904d9040d..0000000000 --- a/pageserver/src/tenant_threads.rs +++ /dev/null @@ -1,79 +0,0 @@ -//! This module contains functions to serve per-tenant background processes, -//! such as compaction and GC -use crate::repository::Repository; -use crate::tenant_mgr; -use crate::tenant_mgr::TenantState; -use anyhow::Result; -use std::time::Duration; -use tracing::*; -use utils::zid::ZTenantId; - -/// -/// Compaction thread's main loop -/// -pub fn compact_loop(tenantid: ZTenantId) -> Result<()> { - if let Err(err) = compact_loop_ext(tenantid) { - error!("compact loop terminated with error: {:?}", err); - Err(err) - } else { - Ok(()) - } -} - -fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> { - loop { - if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) { - break; - } - let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; - let compaction_period = repo.get_compaction_period(); - - std::thread::sleep(compaction_period); - trace!("compaction thread for tenant {} waking up", tenantid); - - // Compact timelines - let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; - repo.compaction_iteration()?; - } - - trace!( - "compaction thread stopped for tenant {} state is {:?}", - tenantid, - tenant_mgr::get_tenant_state(tenantid) - ); - Ok(()) -} - -/// -/// GC thread's main loop -/// -pub fn gc_loop(tenantid: ZTenantId) -> Result<()> { - loop { - if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) { - break; - } - - trace!("gc thread for tenant {} waking up", tenantid); - let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; - let gc_horizon = repo.get_gc_horizon(); - // Garbage collect old files that are not needed for PITR anymore - if gc_horizon > 0 { - repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?; - } - - // TODO Write it in more adequate way using - // condvar.wait_timeout() or something - let mut sleep_time = repo.get_gc_period().as_secs(); - while sleep_time > 0 && tenant_mgr::get_tenant_state(tenantid) == Some(TenantState::Active) - { - sleep_time -= 1; - std::thread::sleep(Duration::from_secs(1)); - } - } - trace!( - "GC thread stopped for tenant {} state is {:?}", - tenantid, - tenant_mgr::get_tenant_state(tenantid) - ); - Ok(()) -} diff --git a/pageserver/src/thread_mgr.rs b/pageserver/src/thread_mgr.rs index 6e4bc1a787..ab0d894c70 100644 --- a/pageserver/src/thread_mgr.rs +++ b/pageserver/src/thread_mgr.rs @@ -94,11 +94,8 @@ pub enum ThreadKind { // Main walreceiver manager thread that ensures that every timeline spawns a connection to safekeeper, to fetch WAL. WalReceiverManager, - // Thread that handles compaction of all timelines for a tenant. - Compactor, - - // Thread that handles GC of a tenant - GarbageCollector, + // Thread that schedules new compaction and gc jobs + TenantTaskManager, // Thread that flushes frozen in-memory layers to disk LayerFlushThread, diff --git a/test_runner/batch_others/test_tenant_tasks.py b/test_runner/batch_others/test_tenant_tasks.py new file mode 100644 index 0000000000..9ce2016a64 --- /dev/null +++ b/test_runner/batch_others/test_tenant_tasks.py @@ -0,0 +1,70 @@ +from fixtures.neon_fixtures import NeonEnvBuilder, wait_until +from uuid import UUID +import time + + +def get_only_element(l): + assert len(l) == 1 + return l[0] + + +# Test that gc and compaction tenant tasks start and stop correctly +def test_tenant_tasks(neon_env_builder: NeonEnvBuilder): + # The gc and compaction loops don't bother to watch for tenant state + # changes while sleeping, so we use small periods to make this test + # run faster. With default settings we'd have to wait longer for tasks + # to notice state changes and shut down. + # TODO fix this behavior in the pageserver + tenant_config = "{gc_period = '1 s', compaction_period = '1 s'}" + neon_env_builder.pageserver_config_override = f"tenant_config={tenant_config}" + name = "test_tenant_tasks" + env = neon_env_builder.init_start() + client = env.pageserver.http_client() + + def get_state(tenant): + all_states = client.tenant_list() + matching = [t for t in all_states if t["id"] == tenant.hex] + return get_only_element(matching)["state"] + + def get_metric_value(name): + metrics = client.get_metrics() + relevant = [line for line in metrics.splitlines() if line.startswith(name)] + if len(relevant) == 0: + return 0 + line = get_only_element(relevant) + value = line.lstrip(name).strip() + return int(value) + + def detach_all_timelines(tenant): + timelines = [UUID(t["timeline_id"]) for t in client.timeline_list(tenant)] + for t in timelines: + client.timeline_detach(tenant, t) + + def assert_idle(tenant): + assert get_state(tenant) == "Idle" + + # Create tenant, start compute + tenant, _ = env.neon_cli.create_tenant() + timeline = env.neon_cli.create_timeline(name, tenant_id=tenant) + pg = env.postgres.create_start(name, tenant_id=tenant) + assert (get_state(tenant) == "Active") + + # Stop compute + pg.stop() + + # Detach all tenants and wait for them to go idle + # TODO they should be already idle since there are no active computes + for tenant_info in client.tenant_list(): + tenant_id = UUID(tenant_info["id"]) + detach_all_timelines(tenant_id) + wait_until(10, 0.2, lambda: assert_idle(tenant_id)) + + # Assert that all tasks finish quickly after tenants go idle + def assert_tasks_finish(): + tasks_started = get_metric_value('pageserver_tenant_task_events{event="start"}') + tasks_ended = get_metric_value('pageserver_tenant_task_events{event="stop"}') + tasks_panicked = get_metric_value('pageserver_tenant_task_events{event="panic"}') + assert tasks_started == tasks_ended + assert tasks_panicked == 0 + + wait_until(10, 0.2, assert_tasks_finish) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 93efc7d5d2..8df4878039 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2143,7 +2143,7 @@ def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, pg: Post assert (mismatch, error) == ([], []) -def wait_until(number_of_iterations: int, interval: int, func): +def wait_until(number_of_iterations: int, interval: float, func): """ Wait until 'func' returns successfully, without exception. Returns the last return value from the the function.