mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-12 15:10:38 +00:00
Compare commits
28 Commits
arpad/azur
...
tenant-tas
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
802ef85cfb | ||
|
|
2b4c3cb932 | ||
|
|
bf76f43ea4 | ||
|
|
98062865f4 | ||
|
|
cdc81996b4 | ||
|
|
1169e9ea4c | ||
|
|
3a23869780 | ||
|
|
796ee4d8af | ||
|
|
b31ce411d2 | ||
|
|
24a5bd10a0 | ||
|
|
2c029d9803 | ||
|
|
763b00ccee | ||
|
|
c44c8a0ea0 | ||
|
|
692496d733 | ||
|
|
0f4552a544 | ||
|
|
d7d4cc8c77 | ||
|
|
9aab1d0f2b | ||
|
|
83dc93ab0f | ||
|
|
9a9a58d52c | ||
|
|
865e8740a7 | ||
|
|
1a5d1a15d0 | ||
|
|
02a9883f0f | ||
|
|
ee36ca54d5 | ||
|
|
36cc6d2928 | ||
|
|
e1a4c06918 | ||
|
|
ec4528505e | ||
|
|
c79e72e835 | ||
|
|
a1f85715ac |
@@ -263,6 +263,8 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
|
||||
// start profiler (if enabled)
|
||||
let profiler_guard = profiling::init_profiler(conf);
|
||||
|
||||
pageserver::tenant_tasks::init_tenant_task_pool()?;
|
||||
|
||||
// initialize authentication for incoming connections
|
||||
let auth = match &conf.auth_type {
|
||||
AuthType::Trust | AuthType::MD5 => None,
|
||||
|
||||
@@ -39,7 +39,8 @@ use crate::storage_sync::index::RemoteIndex;
|
||||
use crate::tenant_config::{TenantConf, TenantConfOpt};
|
||||
|
||||
use crate::repository::{
|
||||
GcResult, Repository, RepositoryTimeline, Timeline, TimelineSyncStatusUpdate, TimelineWriter,
|
||||
GcResult, RepoIoError, Repository, RepositoryTimeline, Timeline, TimelineSyncStatusUpdate,
|
||||
TimelineWriter,
|
||||
};
|
||||
use crate::repository::{Key, Value};
|
||||
use crate::tenant_mgr;
|
||||
@@ -158,6 +159,11 @@ pub struct LayeredRepository {
|
||||
// Global pageserver config parameters
|
||||
pub conf: &'static PageServerConf,
|
||||
|
||||
// Freezing the repo disallows any writes to its directory.
|
||||
//
|
||||
// Any writers must hold frozen.try_read() while writing.
|
||||
pub frozen: RwLock<bool>,
|
||||
|
||||
// Overridden tenant-specific config parameters.
|
||||
// We keep TenantConfOpt sturct here to preserve the information
|
||||
// about parameters that are not set.
|
||||
@@ -319,19 +325,38 @@ impl Repository for LayeredRepository {
|
||||
horizon: u64,
|
||||
pitr: Duration,
|
||||
checkpoint_before_gc: bool,
|
||||
) -> Result<GcResult> {
|
||||
) -> Result<GcResult, RepoIoError> {
|
||||
let timeline_str = target_timelineid
|
||||
.map(|x| x.to_string())
|
||||
.unwrap_or_else(|| "-".to_string());
|
||||
|
||||
// Make sure repo is not frozen
|
||||
let guard = match self.frozen.try_read() {
|
||||
Ok(g) => g,
|
||||
Err(_) => return Err(RepoIoError::RepoFreezingError),
|
||||
};
|
||||
if *guard {
|
||||
return Err(RepoIoError::RepoFrozenError);
|
||||
}
|
||||
|
||||
STORAGE_TIME
|
||||
.with_label_values(&["gc", &self.tenant_id.to_string(), &timeline_str])
|
||||
.observe_closure_duration(|| {
|
||||
self.gc_iteration_internal(target_timelineid, horizon, pitr, checkpoint_before_gc)
|
||||
})
|
||||
.map_err(|err| err.into())
|
||||
}
|
||||
|
||||
fn compaction_iteration(&self) -> Result<()> {
|
||||
fn compaction_iteration(&self) -> Result<(), RepoIoError> {
|
||||
// Make sure repo is not frozen
|
||||
let guard = match self.frozen.try_read() {
|
||||
Ok(g) => g,
|
||||
Err(_) => return Err(RepoIoError::RepoFreezingError),
|
||||
};
|
||||
if *guard {
|
||||
return Err(RepoIoError::RepoFrozenError);
|
||||
}
|
||||
|
||||
// Scan through the hashmap and collect a list of all the timelines,
|
||||
// while holding the lock. Then drop the lock and actually perform the
|
||||
// compactions. We don't want to block everything else while the
|
||||
@@ -685,6 +710,7 @@ impl LayeredRepository {
|
||||
) -> LayeredRepository {
|
||||
LayeredRepository {
|
||||
tenant_id,
|
||||
frozen: RwLock::new(false),
|
||||
conf,
|
||||
tenant_conf: Arc::new(RwLock::new(tenant_conf)),
|
||||
timelines: Mutex::new(HashMap::new()),
|
||||
|
||||
@@ -13,7 +13,7 @@ pub mod repository;
|
||||
pub mod storage_sync;
|
||||
pub mod tenant_config;
|
||||
pub mod tenant_mgr;
|
||||
pub mod tenant_threads;
|
||||
pub mod tenant_tasks;
|
||||
pub mod thread_mgr;
|
||||
pub mod timelines;
|
||||
pub mod virtual_file;
|
||||
|
||||
@@ -196,6 +196,19 @@ impl Display for TimelineSyncStatusUpdate {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum RepoIoError {
|
||||
#[error("Cannot write to repo path while repo is frozen")]
|
||||
RepoFrozenError,
|
||||
|
||||
#[error("Cannot write to repo path while repo is being frozen")]
|
||||
RepoFreezingError,
|
||||
|
||||
/// Unstructured anyhow error
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
///
|
||||
/// A repository corresponds to one .neon directory. One repository holds multiple
|
||||
/// timelines, forked off from the same initial call to 'initdb'.
|
||||
@@ -252,13 +265,13 @@ pub trait Repository: Send + Sync {
|
||||
horizon: u64,
|
||||
pitr: Duration,
|
||||
checkpoint_before_gc: bool,
|
||||
) -> Result<GcResult>;
|
||||
) -> Result<GcResult, RepoIoError>;
|
||||
|
||||
/// Perform one compaction iteration.
|
||||
/// This function is periodically called by compactor thread.
|
||||
/// Also it can be explicitly requested per timeline through page server
|
||||
/// api's 'compact' command.
|
||||
fn compaction_iteration(&self) -> Result<()>;
|
||||
fn compaction_iteration(&self) -> Result<(), RepoIoError>;
|
||||
|
||||
/// detaches timeline-related in-memory data.
|
||||
fn detach_timeline(&self, timeline_id: ZTimelineId) -> Result<()>;
|
||||
|
||||
@@ -230,8 +230,6 @@ pub fn shutdown_all_tenants() {
|
||||
drop(m);
|
||||
|
||||
thread_mgr::shutdown_threads(Some(ThreadKind::WalReceiverManager), None, None);
|
||||
thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), None, None);
|
||||
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), None, None);
|
||||
|
||||
// Ok, no background threads running anymore. Flush any remaining data in
|
||||
// memory to disk.
|
||||
@@ -330,44 +328,15 @@ pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow:
|
||||
}
|
||||
(TenantState::Idle, TenantState::Active) => {
|
||||
info!("activating tenant {tenant_id}");
|
||||
let compactor_spawn_result = thread_mgr::spawn(
|
||||
ThreadKind::Compactor,
|
||||
Some(tenant_id),
|
||||
None,
|
||||
"Compactor thread",
|
||||
false,
|
||||
move || crate::tenant_threads::compact_loop(tenant_id),
|
||||
);
|
||||
if compactor_spawn_result.is_err() {
|
||||
let mut m = tenants_state::write_tenants();
|
||||
m.get_mut(&tenant_id)
|
||||
.with_context(|| format!("Tenant not found for id {tenant_id}"))?
|
||||
.state = old_state;
|
||||
drop(m);
|
||||
}
|
||||
compactor_spawn_result?;
|
||||
|
||||
let gc_spawn_result = thread_mgr::spawn(
|
||||
ThreadKind::GarbageCollector,
|
||||
Some(tenant_id),
|
||||
None,
|
||||
"GC thread",
|
||||
false,
|
||||
move || crate::tenant_threads::gc_loop(tenant_id),
|
||||
)
|
||||
.map(|_thread_id| ()) // update the `Result::Ok` type to match the outer function's return signature
|
||||
.with_context(|| format!("Failed to launch GC thread for tenant {tenant_id}"));
|
||||
// Unfreeze the repo, allowing gc/compaction jobs to run
|
||||
let repo = get_repository_for_tenant(tenant_id)?;
|
||||
*repo.frozen.write().unwrap() = false;
|
||||
|
||||
if let Err(e) = &gc_spawn_result {
|
||||
let mut m = tenants_state::write_tenants();
|
||||
m.get_mut(&tenant_id)
|
||||
.with_context(|| format!("Tenant not found for id {tenant_id}"))?
|
||||
.state = old_state;
|
||||
drop(m);
|
||||
error!("Failed to start GC thread for tenant {tenant_id}, stopping its checkpointer thread: {e:?}");
|
||||
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
|
||||
return gc_spawn_result;
|
||||
}
|
||||
// Spawn gc and compaction loops. The loops will shut themselves
|
||||
// down when tenant becomes idle and repo becomes frozen.
|
||||
crate::tenant_tasks::start_compaction_loop(tenant_id)?;
|
||||
crate::tenant_tasks::start_gc_loop(tenant_id)?;
|
||||
}
|
||||
(TenantState::Idle, TenantState::Stopping) => {
|
||||
info!("stopping idle tenant {tenant_id}");
|
||||
@@ -379,8 +348,10 @@ pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow:
|
||||
Some(tenant_id),
|
||||
None,
|
||||
);
|
||||
thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), Some(tenant_id), None);
|
||||
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
|
||||
|
||||
// Freeze the repo, waiting for existing gc/compaction to finish
|
||||
let repo = get_repository_for_tenant(tenant_id)?;
|
||||
*repo.frozen.write().unwrap() = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
271
pageserver/src/tenant_tasks.rs
Normal file
271
pageserver/src/tenant_tasks.rs
Normal file
@@ -0,0 +1,271 @@
|
||||
//! This module contains functions to serve per-tenant background processes,
|
||||
//! such as compaction and GC
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::ops::ControlFlow;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::repository::{RepoIoError, Repository};
|
||||
use crate::thread_mgr::ThreadKind;
|
||||
use crate::{tenant_mgr, thread_mgr};
|
||||
use anyhow::{self, Context};
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::StreamExt;
|
||||
use metrics::{register_int_counter_vec, IntCounterVec};
|
||||
use once_cell::sync::{Lazy, OnceCell};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::watch;
|
||||
use tracing::*;
|
||||
use utils::zid::ZTenantId;
|
||||
|
||||
static TENANT_TASK_EVENTS: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"tenant_task_events",
|
||||
"Number of task start/stop/fail events.",
|
||||
&["event"],
|
||||
)
|
||||
.expect("Failed to register tenant_task_events metric")
|
||||
});
|
||||
|
||||
///
|
||||
/// Compaction task's main loop
|
||||
///
|
||||
async fn compaction_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) {
|
||||
loop {
|
||||
trace!("waking up");
|
||||
|
||||
// Run blocking part of the task
|
||||
let period: Result<Result<_, anyhow::Error>, _> = tokio::task::spawn_blocking(move || {
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
let compaction_period = repo.get_compaction_period();
|
||||
match repo.compaction_iteration() {
|
||||
Ok(_) => Ok(ControlFlow::Continue(compaction_period)),
|
||||
Err(RepoIoError::RepoFreezingError | RepoIoError::RepoFrozenError) => {
|
||||
Ok(ControlFlow::Break(()))
|
||||
}
|
||||
Err(RepoIoError::Other(e)) => Err(e),
|
||||
}
|
||||
})
|
||||
.await;
|
||||
|
||||
// Decide whether to sleep or break
|
||||
let sleep_duration = match period {
|
||||
Ok(Ok(ControlFlow::Continue(period))) => period,
|
||||
Ok(Ok(ControlFlow::Break(()))) => break,
|
||||
Ok(Err(e)) => {
|
||||
error!("Compaction failed, retrying: {}", e);
|
||||
Duration::from_secs(2)
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Compaction join error, retrying: {}", e);
|
||||
Duration::from_secs(2)
|
||||
}
|
||||
};
|
||||
|
||||
// Sleep
|
||||
tokio::select! {
|
||||
_ = cancel.changed() => {
|
||||
trace!("received cancellation request");
|
||||
break;
|
||||
},
|
||||
_ = tokio::time::sleep(sleep_duration) => {},
|
||||
}
|
||||
}
|
||||
|
||||
trace!(
|
||||
"compaction loop stopped. State is {:?}",
|
||||
tenant_mgr::get_tenant_state(tenantid)
|
||||
);
|
||||
}
|
||||
|
||||
static START_GC_LOOP: OnceCell<mpsc::Sender<ZTenantId>> = OnceCell::new();
|
||||
static START_COMPACTION_LOOP: OnceCell<mpsc::Sender<ZTenantId>> = OnceCell::new();
|
||||
|
||||
/// Spawn a task that will periodically schedule garbage collection until
|
||||
/// the tenant becomes inactive. This should be called on tenant
|
||||
/// activation.
|
||||
pub fn start_gc_loop(tenantid: ZTenantId) -> anyhow::Result<()> {
|
||||
START_GC_LOOP
|
||||
.get()
|
||||
.context("Failed to get START_GC_LOOP")?
|
||||
.blocking_send(tenantid)
|
||||
.context("Failed to send to START_GC_LOOP channel")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Spawn a task that will periodically schedule compaction until
|
||||
/// the tenant becomes inactive. This should be called on tenant
|
||||
/// activation.
|
||||
pub fn start_compaction_loop(tenantid: ZTenantId) -> anyhow::Result<()> {
|
||||
START_COMPACTION_LOOP
|
||||
.get()
|
||||
.context("failed to get START_COMPACTION_LOOP")?
|
||||
.blocking_send(tenantid)
|
||||
.context("failed to send to START_COMPACTION_LOOP")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Spawn the TenantTaskManager
|
||||
/// This needs to be called before start_gc_loop or start_compaction_loop
|
||||
pub fn init_tenant_task_pool() -> anyhow::Result<()> {
|
||||
let runtime = tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("tenant-task-worker")
|
||||
.worker_threads(40) // Way more than necessary
|
||||
.max_blocking_threads(100) // Way more than necessary
|
||||
.enable_all()
|
||||
.build()?;
|
||||
|
||||
let (gc_send, mut gc_recv) = mpsc::channel::<ZTenantId>(100);
|
||||
START_GC_LOOP
|
||||
.set(gc_send)
|
||||
.expect("Failed to set START_GC_LOOP");
|
||||
|
||||
let (compaction_send, mut compaction_recv) = mpsc::channel::<ZTenantId>(100);
|
||||
START_COMPACTION_LOOP
|
||||
.set(compaction_send)
|
||||
.expect("Failed to set START_COMPACTION_LOOP");
|
||||
|
||||
// TODO this is getting repetitive
|
||||
let mut gc_loops = HashMap::<ZTenantId, watch::Sender<()>>::new();
|
||||
let mut compaction_loops = HashMap::<ZTenantId, watch::Sender<()>>::new();
|
||||
|
||||
thread_mgr::spawn(
|
||||
ThreadKind::TenantTaskManager,
|
||||
None,
|
||||
None,
|
||||
"Tenant task manager main thread",
|
||||
true,
|
||||
move || {
|
||||
runtime.block_on(async move {
|
||||
let mut futures = FuturesUnordered::new();
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = thread_mgr::shutdown_watcher() => {
|
||||
// Send cancellation to all tasks
|
||||
for (_, cancel) in gc_loops.drain() {
|
||||
cancel.send(()).ok();
|
||||
}
|
||||
for (_, cancel) in compaction_loops.drain() {
|
||||
cancel.send(()).ok();
|
||||
}
|
||||
|
||||
// Exit after all tasks finish
|
||||
while let Some(result) = futures.next().await {
|
||||
match result {
|
||||
Ok(()) => {
|
||||
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
|
||||
},
|
||||
Err(e) => {
|
||||
TENANT_TASK_EVENTS.with_label_values(&["panic"]).inc();
|
||||
error!("loop join error {}", e)
|
||||
},
|
||||
}
|
||||
}
|
||||
break;
|
||||
},
|
||||
tenantid = gc_recv.recv() => {
|
||||
let tenantid = tenantid.expect("Gc task channel closed unexpectedly");
|
||||
|
||||
// Spawn new task, request cancellation of the old one if exists
|
||||
let (cancel_send, cancel_recv) = watch::channel(());
|
||||
let handle = tokio::spawn(gc_loop(tenantid, cancel_recv)
|
||||
.instrument(trace_span!("gc loop", tenant = %tenantid)));
|
||||
if let Some(old_cancel_send) = gc_loops.insert(tenantid, cancel_send) {
|
||||
old_cancel_send.send(()).ok();
|
||||
}
|
||||
|
||||
// Update metrics, remember handle
|
||||
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
|
||||
futures.push(handle);
|
||||
},
|
||||
tenantid = compaction_recv.recv() => {
|
||||
let tenantid = tenantid.expect("Compaction task channel closed unexpectedly");
|
||||
|
||||
// Spawn new task, request cancellation of the old one if exists
|
||||
let (cancel_send, cancel_recv) = watch::channel(());
|
||||
let handle = tokio::spawn(compaction_loop(tenantid, cancel_recv)
|
||||
.instrument(trace_span!("compaction loop", tenant = %tenantid)));
|
||||
if let Some(old_cancel_send) = compaction_loops.insert(tenantid, cancel_send) {
|
||||
old_cancel_send.send(()).ok();
|
||||
}
|
||||
|
||||
// Update metrics, remember handle
|
||||
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
|
||||
futures.push(handle);
|
||||
},
|
||||
result = futures.next() => {
|
||||
// Log and count any unhandled panics
|
||||
match result {
|
||||
Some(Ok(())) => {
|
||||
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
|
||||
},
|
||||
Some(Err(e)) => {
|
||||
TENANT_TASK_EVENTS.with_label_values(&["panic"]).inc();
|
||||
error!("loop join error {}", e)
|
||||
},
|
||||
None => {},
|
||||
};
|
||||
},
|
||||
}
|
||||
}
|
||||
});
|
||||
Ok(())
|
||||
},
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// GC task's main loop
|
||||
///
|
||||
async fn gc_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) {
|
||||
loop {
|
||||
trace!("waking up");
|
||||
|
||||
// Run blocking part of the task
|
||||
let period: Result<Result<_, anyhow::Error>, _> = tokio::task::spawn_blocking(move || {
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
let gc_period = repo.get_gc_period();
|
||||
let gc_horizon = repo.get_gc_horizon();
|
||||
if gc_horizon > 0 {
|
||||
match repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false) {
|
||||
Ok(_) => return Ok(ControlFlow::Continue(gc_period)),
|
||||
Err(RepoIoError::RepoFreezingError | RepoIoError::RepoFrozenError) => {
|
||||
return Ok(ControlFlow::Break(()))
|
||||
}
|
||||
Err(RepoIoError::Other(e)) => return Err(e),
|
||||
}
|
||||
}
|
||||
Ok(ControlFlow::Continue(gc_period))
|
||||
})
|
||||
.await;
|
||||
|
||||
// Decide whether to sleep or break
|
||||
let sleep_duration = match period {
|
||||
Ok(Ok(ControlFlow::Continue(period))) => period,
|
||||
Ok(Ok(ControlFlow::Break(()))) => break,
|
||||
Ok(Err(e)) => {
|
||||
error!("Gc failed, retrying: {}", e);
|
||||
Duration::from_secs(2)
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Gc join error, retrying: {}", e);
|
||||
Duration::from_secs(2)
|
||||
}
|
||||
};
|
||||
|
||||
// Sleep
|
||||
tokio::select! {
|
||||
_ = cancel.changed() => {
|
||||
trace!("received cancellation request");
|
||||
break;
|
||||
},
|
||||
_ = tokio::time::sleep(sleep_duration) => {},
|
||||
}
|
||||
}
|
||||
trace!(
|
||||
"GC loop stopped. State is {:?}",
|
||||
tenant_mgr::get_tenant_state(tenantid)
|
||||
);
|
||||
}
|
||||
@@ -1,79 +0,0 @@
|
||||
//! This module contains functions to serve per-tenant background processes,
|
||||
//! such as compaction and GC
|
||||
use crate::repository::Repository;
|
||||
use crate::tenant_mgr;
|
||||
use crate::tenant_mgr::TenantState;
|
||||
use anyhow::Result;
|
||||
use std::time::Duration;
|
||||
use tracing::*;
|
||||
use utils::zid::ZTenantId;
|
||||
|
||||
///
|
||||
/// Compaction thread's main loop
|
||||
///
|
||||
pub fn compact_loop(tenantid: ZTenantId) -> Result<()> {
|
||||
if let Err(err) = compact_loop_ext(tenantid) {
|
||||
error!("compact loop terminated with error: {:?}", err);
|
||||
Err(err)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> {
|
||||
loop {
|
||||
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
|
||||
break;
|
||||
}
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
let compaction_period = repo.get_compaction_period();
|
||||
|
||||
std::thread::sleep(compaction_period);
|
||||
trace!("compaction thread for tenant {} waking up", tenantid);
|
||||
|
||||
// Compact timelines
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
repo.compaction_iteration()?;
|
||||
}
|
||||
|
||||
trace!(
|
||||
"compaction thread stopped for tenant {} state is {:?}",
|
||||
tenantid,
|
||||
tenant_mgr::get_tenant_state(tenantid)
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// GC thread's main loop
|
||||
///
|
||||
pub fn gc_loop(tenantid: ZTenantId) -> Result<()> {
|
||||
loop {
|
||||
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
|
||||
break;
|
||||
}
|
||||
|
||||
trace!("gc thread for tenant {} waking up", tenantid);
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
let gc_horizon = repo.get_gc_horizon();
|
||||
// Garbage collect old files that are not needed for PITR anymore
|
||||
if gc_horizon > 0 {
|
||||
repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?;
|
||||
}
|
||||
|
||||
// TODO Write it in more adequate way using
|
||||
// condvar.wait_timeout() or something
|
||||
let mut sleep_time = repo.get_gc_period().as_secs();
|
||||
while sleep_time > 0 && tenant_mgr::get_tenant_state(tenantid) == Some(TenantState::Active)
|
||||
{
|
||||
sleep_time -= 1;
|
||||
std::thread::sleep(Duration::from_secs(1));
|
||||
}
|
||||
}
|
||||
trace!(
|
||||
"GC thread stopped for tenant {} state is {:?}",
|
||||
tenantid,
|
||||
tenant_mgr::get_tenant_state(tenantid)
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
@@ -94,11 +94,8 @@ pub enum ThreadKind {
|
||||
// Main walreceiver manager thread that ensures that every timeline spawns a connection to safekeeper, to fetch WAL.
|
||||
WalReceiverManager,
|
||||
|
||||
// Thread that handles compaction of all timelines for a tenant.
|
||||
Compactor,
|
||||
|
||||
// Thread that handles GC of a tenant
|
||||
GarbageCollector,
|
||||
// Thread that schedules new compaction and gc jobs
|
||||
TenantTaskManager,
|
||||
|
||||
// Thread that flushes frozen in-memory layers to disk
|
||||
LayerFlushThread,
|
||||
|
||||
Reference in New Issue
Block a user