Compare commits

...

28 Commits

Author SHA1 Message Date
Bojan Serafimov
802ef85cfb Add repo state 2022-06-28 10:34:37 -04:00
Bojan Serafimov
2b4c3cb932 Handle errors on shutdown 2022-06-28 09:29:59 -04:00
Bojan Serafimov
bf76f43ea4 Wait for tasks to complete 2022-06-27 17:54:31 -04:00
Bojan Serafimov
98062865f4 Handle errors inside loop 2022-06-24 17:08:44 -04:00
Bojan Serafimov
cdc81996b4 Add metrics 2022-06-24 15:28:53 -04:00
Bojan Serafimov
1169e9ea4c Rename threads to tasks 2022-06-24 11:08:55 -04:00
Bojan Serafimov
3a23869780 Log errors 2022-06-24 11:05:07 -04:00
Bojan Serafimov
796ee4d8af Instrument the task, not the await 2022-06-24 10:29:42 -04:00
Bojan Serafimov
b31ce411d2 Remove unnecessary map_err 2022-06-24 09:50:14 -04:00
Bojan Serafimov
24a5bd10a0 Add cancellation 2022-06-23 23:25:48 -04:00
Bojan Serafimov
2c029d9803 Remove redundant error context 2022-06-23 22:32:15 -04:00
Bojan Serafimov
763b00ccee Merge branch 'tenant-tasks' of github.com:neondatabase/neon into tenant-tasks 2022-06-23 14:15:14 -04:00
Bojan Serafimov
c44c8a0ea0 Add comment 2022-06-23 14:14:39 -04:00
Bojan Serafimov
692496d733 Cancel tasks 2022-06-23 13:20:53 -04:00
Bojan Serafimov
0f4552a544 Update TODO 2022-06-23 12:59:20 -04:00
Bojan Serafimov
d7d4cc8c77 Error instead of panic 2022-06-23 12:56:21 -04:00
Bojan Serafimov
9aab1d0f2b Expand blocking scope 2022-06-22 16:04:42 -04:00
Bojan Serafimov
83dc93ab0f Merge branch 'main' into tenant-tasks 2022-06-22 14:49:45 -04:00
Bojan Serafimov
9a9a58d52c Fmt 2022-06-15 09:49:07 -04:00
Bojan Serafimov
865e8740a7 Add docs 2022-06-15 09:35:41 -04:00
Bojan Serafimov
1a5d1a15d0 Use tokio sleep instead 2022-06-15 09:33:56 -04:00
Bojan Serafimov
02a9883f0f Add TenantTaskManager 2022-06-15 09:24:46 -04:00
Bojan Serafimov
ee36ca54d5 Run compaction as task 2022-06-15 09:16:57 -04:00
Bojan Serafimov
36cc6d2928 Fmt 2022-06-14 15:07:36 -04:00
Bojan Serafimov
e1a4c06918 Fix init 2022-06-14 15:05:30 -04:00
Bojan Serafimov
ec4528505e Simplify 2022-06-14 12:38:09 -04:00
Bojan Serafimov
c79e72e835 Add runtime 2022-06-14 11:08:17 -04:00
Bojan Serafimov
a1f85715ac WIP 2022-06-14 09:39:55 -04:00
8 changed files with 331 additions and 130 deletions

View File

@@ -263,6 +263,8 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
// start profiler (if enabled)
let profiler_guard = profiling::init_profiler(conf);
pageserver::tenant_tasks::init_tenant_task_pool()?;
// initialize authentication for incoming connections
let auth = match &conf.auth_type {
AuthType::Trust | AuthType::MD5 => None,

View File

@@ -39,7 +39,8 @@ use crate::storage_sync::index::RemoteIndex;
use crate::tenant_config::{TenantConf, TenantConfOpt};
use crate::repository::{
GcResult, Repository, RepositoryTimeline, Timeline, TimelineSyncStatusUpdate, TimelineWriter,
GcResult, RepoIoError, Repository, RepositoryTimeline, Timeline, TimelineSyncStatusUpdate,
TimelineWriter,
};
use crate::repository::{Key, Value};
use crate::tenant_mgr;
@@ -158,6 +159,11 @@ pub struct LayeredRepository {
// Global pageserver config parameters
pub conf: &'static PageServerConf,
// Freezing the repo disallows any writes to its directory.
//
// Any writers must hold frozen.try_read() while writing.
pub frozen: RwLock<bool>,
// Overridden tenant-specific config parameters.
// We keep TenantConfOpt sturct here to preserve the information
// about parameters that are not set.
@@ -319,19 +325,38 @@ impl Repository for LayeredRepository {
horizon: u64,
pitr: Duration,
checkpoint_before_gc: bool,
) -> Result<GcResult> {
) -> Result<GcResult, RepoIoError> {
let timeline_str = target_timelineid
.map(|x| x.to_string())
.unwrap_or_else(|| "-".to_string());
// Make sure repo is not frozen
let guard = match self.frozen.try_read() {
Ok(g) => g,
Err(_) => return Err(RepoIoError::RepoFreezingError),
};
if *guard {
return Err(RepoIoError::RepoFrozenError);
}
STORAGE_TIME
.with_label_values(&["gc", &self.tenant_id.to_string(), &timeline_str])
.observe_closure_duration(|| {
self.gc_iteration_internal(target_timelineid, horizon, pitr, checkpoint_before_gc)
})
.map_err(|err| err.into())
}
fn compaction_iteration(&self) -> Result<()> {
fn compaction_iteration(&self) -> Result<(), RepoIoError> {
// Make sure repo is not frozen
let guard = match self.frozen.try_read() {
Ok(g) => g,
Err(_) => return Err(RepoIoError::RepoFreezingError),
};
if *guard {
return Err(RepoIoError::RepoFrozenError);
}
// Scan through the hashmap and collect a list of all the timelines,
// while holding the lock. Then drop the lock and actually perform the
// compactions. We don't want to block everything else while the
@@ -685,6 +710,7 @@ impl LayeredRepository {
) -> LayeredRepository {
LayeredRepository {
tenant_id,
frozen: RwLock::new(false),
conf,
tenant_conf: Arc::new(RwLock::new(tenant_conf)),
timelines: Mutex::new(HashMap::new()),

View File

@@ -13,7 +13,7 @@ pub mod repository;
pub mod storage_sync;
pub mod tenant_config;
pub mod tenant_mgr;
pub mod tenant_threads;
pub mod tenant_tasks;
pub mod thread_mgr;
pub mod timelines;
pub mod virtual_file;

View File

@@ -196,6 +196,19 @@ impl Display for TimelineSyncStatusUpdate {
}
}
#[derive(Debug, thiserror::Error)]
pub enum RepoIoError {
#[error("Cannot write to repo path while repo is frozen")]
RepoFrozenError,
#[error("Cannot write to repo path while repo is being frozen")]
RepoFreezingError,
/// Unstructured anyhow error
#[error(transparent)]
Other(#[from] anyhow::Error),
}
///
/// A repository corresponds to one .neon directory. One repository holds multiple
/// timelines, forked off from the same initial call to 'initdb'.
@@ -252,13 +265,13 @@ pub trait Repository: Send + Sync {
horizon: u64,
pitr: Duration,
checkpoint_before_gc: bool,
) -> Result<GcResult>;
) -> Result<GcResult, RepoIoError>;
/// Perform one compaction iteration.
/// This function is periodically called by compactor thread.
/// Also it can be explicitly requested per timeline through page server
/// api's 'compact' command.
fn compaction_iteration(&self) -> Result<()>;
fn compaction_iteration(&self) -> Result<(), RepoIoError>;
/// detaches timeline-related in-memory data.
fn detach_timeline(&self, timeline_id: ZTimelineId) -> Result<()>;

View File

@@ -230,8 +230,6 @@ pub fn shutdown_all_tenants() {
drop(m);
thread_mgr::shutdown_threads(Some(ThreadKind::WalReceiverManager), None, None);
thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), None, None);
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), None, None);
// Ok, no background threads running anymore. Flush any remaining data in
// memory to disk.
@@ -330,44 +328,15 @@ pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow:
}
(TenantState::Idle, TenantState::Active) => {
info!("activating tenant {tenant_id}");
let compactor_spawn_result = thread_mgr::spawn(
ThreadKind::Compactor,
Some(tenant_id),
None,
"Compactor thread",
false,
move || crate::tenant_threads::compact_loop(tenant_id),
);
if compactor_spawn_result.is_err() {
let mut m = tenants_state::write_tenants();
m.get_mut(&tenant_id)
.with_context(|| format!("Tenant not found for id {tenant_id}"))?
.state = old_state;
drop(m);
}
compactor_spawn_result?;
let gc_spawn_result = thread_mgr::spawn(
ThreadKind::GarbageCollector,
Some(tenant_id),
None,
"GC thread",
false,
move || crate::tenant_threads::gc_loop(tenant_id),
)
.map(|_thread_id| ()) // update the `Result::Ok` type to match the outer function's return signature
.with_context(|| format!("Failed to launch GC thread for tenant {tenant_id}"));
// Unfreeze the repo, allowing gc/compaction jobs to run
let repo = get_repository_for_tenant(tenant_id)?;
*repo.frozen.write().unwrap() = false;
if let Err(e) = &gc_spawn_result {
let mut m = tenants_state::write_tenants();
m.get_mut(&tenant_id)
.with_context(|| format!("Tenant not found for id {tenant_id}"))?
.state = old_state;
drop(m);
error!("Failed to start GC thread for tenant {tenant_id}, stopping its checkpointer thread: {e:?}");
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
return gc_spawn_result;
}
// Spawn gc and compaction loops. The loops will shut themselves
// down when tenant becomes idle and repo becomes frozen.
crate::tenant_tasks::start_compaction_loop(tenant_id)?;
crate::tenant_tasks::start_gc_loop(tenant_id)?;
}
(TenantState::Idle, TenantState::Stopping) => {
info!("stopping idle tenant {tenant_id}");
@@ -379,8 +348,10 @@ pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow:
Some(tenant_id),
None,
);
thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), Some(tenant_id), None);
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
// Freeze the repo, waiting for existing gc/compaction to finish
let repo = get_repository_for_tenant(tenant_id)?;
*repo.frozen.write().unwrap() = true;
}
}

View File

@@ -0,0 +1,271 @@
//! This module contains functions to serve per-tenant background processes,
//! such as compaction and GC
use std::collections::HashMap;
use std::ops::ControlFlow;
use std::time::Duration;
use crate::repository::{RepoIoError, Repository};
use crate::thread_mgr::ThreadKind;
use crate::{tenant_mgr, thread_mgr};
use anyhow::{self, Context};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use metrics::{register_int_counter_vec, IntCounterVec};
use once_cell::sync::{Lazy, OnceCell};
use tokio::sync::mpsc;
use tokio::sync::watch;
use tracing::*;
use utils::zid::ZTenantId;
static TENANT_TASK_EVENTS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"tenant_task_events",
"Number of task start/stop/fail events.",
&["event"],
)
.expect("Failed to register tenant_task_events metric")
});
///
/// Compaction task's main loop
///
async fn compaction_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) {
loop {
trace!("waking up");
// Run blocking part of the task
let period: Result<Result<_, anyhow::Error>, _> = tokio::task::spawn_blocking(move || {
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let compaction_period = repo.get_compaction_period();
match repo.compaction_iteration() {
Ok(_) => Ok(ControlFlow::Continue(compaction_period)),
Err(RepoIoError::RepoFreezingError | RepoIoError::RepoFrozenError) => {
Ok(ControlFlow::Break(()))
}
Err(RepoIoError::Other(e)) => Err(e),
}
})
.await;
// Decide whether to sleep or break
let sleep_duration = match period {
Ok(Ok(ControlFlow::Continue(period))) => period,
Ok(Ok(ControlFlow::Break(()))) => break,
Ok(Err(e)) => {
error!("Compaction failed, retrying: {}", e);
Duration::from_secs(2)
}
Err(e) => {
error!("Compaction join error, retrying: {}", e);
Duration::from_secs(2)
}
};
// Sleep
tokio::select! {
_ = cancel.changed() => {
trace!("received cancellation request");
break;
},
_ = tokio::time::sleep(sleep_duration) => {},
}
}
trace!(
"compaction loop stopped. State is {:?}",
tenant_mgr::get_tenant_state(tenantid)
);
}
static START_GC_LOOP: OnceCell<mpsc::Sender<ZTenantId>> = OnceCell::new();
static START_COMPACTION_LOOP: OnceCell<mpsc::Sender<ZTenantId>> = OnceCell::new();
/// Spawn a task that will periodically schedule garbage collection until
/// the tenant becomes inactive. This should be called on tenant
/// activation.
pub fn start_gc_loop(tenantid: ZTenantId) -> anyhow::Result<()> {
START_GC_LOOP
.get()
.context("Failed to get START_GC_LOOP")?
.blocking_send(tenantid)
.context("Failed to send to START_GC_LOOP channel")?;
Ok(())
}
/// Spawn a task that will periodically schedule compaction until
/// the tenant becomes inactive. This should be called on tenant
/// activation.
pub fn start_compaction_loop(tenantid: ZTenantId) -> anyhow::Result<()> {
START_COMPACTION_LOOP
.get()
.context("failed to get START_COMPACTION_LOOP")?
.blocking_send(tenantid)
.context("failed to send to START_COMPACTION_LOOP")?;
Ok(())
}
/// Spawn the TenantTaskManager
/// This needs to be called before start_gc_loop or start_compaction_loop
pub fn init_tenant_task_pool() -> anyhow::Result<()> {
let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name("tenant-task-worker")
.worker_threads(40) // Way more than necessary
.max_blocking_threads(100) // Way more than necessary
.enable_all()
.build()?;
let (gc_send, mut gc_recv) = mpsc::channel::<ZTenantId>(100);
START_GC_LOOP
.set(gc_send)
.expect("Failed to set START_GC_LOOP");
let (compaction_send, mut compaction_recv) = mpsc::channel::<ZTenantId>(100);
START_COMPACTION_LOOP
.set(compaction_send)
.expect("Failed to set START_COMPACTION_LOOP");
// TODO this is getting repetitive
let mut gc_loops = HashMap::<ZTenantId, watch::Sender<()>>::new();
let mut compaction_loops = HashMap::<ZTenantId, watch::Sender<()>>::new();
thread_mgr::spawn(
ThreadKind::TenantTaskManager,
None,
None,
"Tenant task manager main thread",
true,
move || {
runtime.block_on(async move {
let mut futures = FuturesUnordered::new();
loop {
tokio::select! {
_ = thread_mgr::shutdown_watcher() => {
// Send cancellation to all tasks
for (_, cancel) in gc_loops.drain() {
cancel.send(()).ok();
}
for (_, cancel) in compaction_loops.drain() {
cancel.send(()).ok();
}
// Exit after all tasks finish
while let Some(result) = futures.next().await {
match result {
Ok(()) => {
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
},
Err(e) => {
TENANT_TASK_EVENTS.with_label_values(&["panic"]).inc();
error!("loop join error {}", e)
},
}
}
break;
},
tenantid = gc_recv.recv() => {
let tenantid = tenantid.expect("Gc task channel closed unexpectedly");
// Spawn new task, request cancellation of the old one if exists
let (cancel_send, cancel_recv) = watch::channel(());
let handle = tokio::spawn(gc_loop(tenantid, cancel_recv)
.instrument(trace_span!("gc loop", tenant = %tenantid)));
if let Some(old_cancel_send) = gc_loops.insert(tenantid, cancel_send) {
old_cancel_send.send(()).ok();
}
// Update metrics, remember handle
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
futures.push(handle);
},
tenantid = compaction_recv.recv() => {
let tenantid = tenantid.expect("Compaction task channel closed unexpectedly");
// Spawn new task, request cancellation of the old one if exists
let (cancel_send, cancel_recv) = watch::channel(());
let handle = tokio::spawn(compaction_loop(tenantid, cancel_recv)
.instrument(trace_span!("compaction loop", tenant = %tenantid)));
if let Some(old_cancel_send) = compaction_loops.insert(tenantid, cancel_send) {
old_cancel_send.send(()).ok();
}
// Update metrics, remember handle
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
futures.push(handle);
},
result = futures.next() => {
// Log and count any unhandled panics
match result {
Some(Ok(())) => {
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
},
Some(Err(e)) => {
TENANT_TASK_EVENTS.with_label_values(&["panic"]).inc();
error!("loop join error {}", e)
},
None => {},
};
},
}
}
});
Ok(())
},
)?;
Ok(())
}
///
/// GC task's main loop
///
async fn gc_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) {
loop {
trace!("waking up");
// Run blocking part of the task
let period: Result<Result<_, anyhow::Error>, _> = tokio::task::spawn_blocking(move || {
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let gc_period = repo.get_gc_period();
let gc_horizon = repo.get_gc_horizon();
if gc_horizon > 0 {
match repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false) {
Ok(_) => return Ok(ControlFlow::Continue(gc_period)),
Err(RepoIoError::RepoFreezingError | RepoIoError::RepoFrozenError) => {
return Ok(ControlFlow::Break(()))
}
Err(RepoIoError::Other(e)) => return Err(e),
}
}
Ok(ControlFlow::Continue(gc_period))
})
.await;
// Decide whether to sleep or break
let sleep_duration = match period {
Ok(Ok(ControlFlow::Continue(period))) => period,
Ok(Ok(ControlFlow::Break(()))) => break,
Ok(Err(e)) => {
error!("Gc failed, retrying: {}", e);
Duration::from_secs(2)
}
Err(e) => {
error!("Gc join error, retrying: {}", e);
Duration::from_secs(2)
}
};
// Sleep
tokio::select! {
_ = cancel.changed() => {
trace!("received cancellation request");
break;
},
_ = tokio::time::sleep(sleep_duration) => {},
}
}
trace!(
"GC loop stopped. State is {:?}",
tenant_mgr::get_tenant_state(tenantid)
);
}

View File

@@ -1,79 +0,0 @@
//! This module contains functions to serve per-tenant background processes,
//! such as compaction and GC
use crate::repository::Repository;
use crate::tenant_mgr;
use crate::tenant_mgr::TenantState;
use anyhow::Result;
use std::time::Duration;
use tracing::*;
use utils::zid::ZTenantId;
///
/// Compaction thread's main loop
///
pub fn compact_loop(tenantid: ZTenantId) -> Result<()> {
if let Err(err) = compact_loop_ext(tenantid) {
error!("compact loop terminated with error: {:?}", err);
Err(err)
} else {
Ok(())
}
}
fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> {
loop {
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
break;
}
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let compaction_period = repo.get_compaction_period();
std::thread::sleep(compaction_period);
trace!("compaction thread for tenant {} waking up", tenantid);
// Compact timelines
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
repo.compaction_iteration()?;
}
trace!(
"compaction thread stopped for tenant {} state is {:?}",
tenantid,
tenant_mgr::get_tenant_state(tenantid)
);
Ok(())
}
///
/// GC thread's main loop
///
pub fn gc_loop(tenantid: ZTenantId) -> Result<()> {
loop {
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
break;
}
trace!("gc thread for tenant {} waking up", tenantid);
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let gc_horizon = repo.get_gc_horizon();
// Garbage collect old files that are not needed for PITR anymore
if gc_horizon > 0 {
repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?;
}
// TODO Write it in more adequate way using
// condvar.wait_timeout() or something
let mut sleep_time = repo.get_gc_period().as_secs();
while sleep_time > 0 && tenant_mgr::get_tenant_state(tenantid) == Some(TenantState::Active)
{
sleep_time -= 1;
std::thread::sleep(Duration::from_secs(1));
}
}
trace!(
"GC thread stopped for tenant {} state is {:?}",
tenantid,
tenant_mgr::get_tenant_state(tenantid)
);
Ok(())
}

View File

@@ -94,11 +94,8 @@ pub enum ThreadKind {
// Main walreceiver manager thread that ensures that every timeline spawns a connection to safekeeper, to fetch WAL.
WalReceiverManager,
// Thread that handles compaction of all timelines for a tenant.
Compactor,
// Thread that handles GC of a tenant
GarbageCollector,
// Thread that schedules new compaction and gc jobs
TenantTaskManager,
// Thread that flushes frozen in-memory layers to disk
LayerFlushThread,