mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 18:32:56 +00:00
Compare commits
9 Commits
hack/compu
...
ps-thread-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d84c4907a9 | ||
|
|
77c35906f5 | ||
|
|
b70cf3f7b1 | ||
|
|
304641afe5 | ||
|
|
13ed1e32b4 | ||
|
|
858ce6a8b5 | ||
|
|
491a1870ce | ||
|
|
30376d87ac | ||
|
|
887b4248d2 |
53
pageserver/src/jobs/chore.rs
Normal file
53
pageserver/src/jobs/chore.rs
Normal file
@@ -0,0 +1,53 @@
|
||||
use std::{marker::PhantomData, ops::Range, time::{Duration, Instant}};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
|
||||
|
||||
pub trait Job: std::fmt::Debug + Send + Copy + Clone + PartialEq + Eq + 'static {
|
||||
type ErrorType: AsRef<dyn std::error::Error + 'static>;
|
||||
fn run(&self) -> Result<(), Self::ErrorType>;
|
||||
}
|
||||
|
||||
pub enum Schedule {
|
||||
Every(Duration),
|
||||
}
|
||||
|
||||
/// A job that repeats on a schedule
|
||||
pub struct Chore<J: Job> {
|
||||
pub job: J,
|
||||
pub schedule: Schedule,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize)]
|
||||
pub struct ChoreHandle<J: Job> {
|
||||
_marker: PhantomData<J>,
|
||||
chore_id: u64,
|
||||
}
|
||||
|
||||
pub enum Status<J: Job> {
|
||||
Scheduled(Instant),
|
||||
Error(J::ErrorType),
|
||||
}
|
||||
|
||||
pub trait Scheduler<J: Job> {
|
||||
fn add_chore(&self, chore: Chore<J>) -> ChoreHandle<J>;
|
||||
fn remove_chore(&self, ch: ChoreHandle<J>);
|
||||
fn get_status(&self, ch: ChoreHandle<J>) -> Status<J>;
|
||||
}
|
||||
|
||||
pub struct SimpleScheduler<J: Job> {
|
||||
_marker: PhantomData<J>,
|
||||
}
|
||||
impl<J: Job> Scheduler<J> for SimpleScheduler<J> {
|
||||
fn add_chore(&self, chore: Chore<J>) -> ChoreHandle<J> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn remove_chore(&self, ch: ChoreHandle<J>) {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_status(&self, ch: ChoreHandle<J>) -> Status<J> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
33
pageserver/src/jobs/compaction.rs
Normal file
33
pageserver/src/jobs/compaction.rs
Normal file
@@ -0,0 +1,33 @@
|
||||
use once_cell::sync::OnceCell;
|
||||
use utils::zid::ZTenantId;
|
||||
use tracing::*;
|
||||
use crate::repository::Repository;
|
||||
|
||||
use crate::tenant_mgr::{self, TenantState};
|
||||
|
||||
use super::chore::{Job, SimpleScheduler};
|
||||
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct CompactionJob {
|
||||
pub tenant: ZTenantId,
|
||||
}
|
||||
|
||||
impl Job for CompactionJob {
|
||||
type ErrorType = anyhow::Error;
|
||||
|
||||
fn run(&self) -> Result<(), Self::ErrorType> {
|
||||
// TODO why not kill the chore when tenant is not active?
|
||||
// TODO GC has the same code too
|
||||
if !matches!(tenant_mgr::get_tenant_state(self.tenant), Some(TenantState::Active(_, _))) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let repo = tenant_mgr::get_repository_for_tenant(self.tenant)?;
|
||||
repo.compaction_iteration()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub static COMPACTION_SCHEDULER: OnceCell<SimpleScheduler<CompactionJob>> = OnceCell::new();
|
||||
35
pageserver/src/jobs/gc.rs
Normal file
35
pageserver/src/jobs/gc.rs
Normal file
@@ -0,0 +1,35 @@
|
||||
use once_cell::sync::OnceCell;
|
||||
use utils::zid::ZTenantId;
|
||||
use tracing::*;
|
||||
use crate::repository::Repository;
|
||||
|
||||
use crate::tenant_mgr::{self, TenantState};
|
||||
|
||||
use super::chore::{Job, SimpleScheduler};
|
||||
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct GcJob {
|
||||
pub tenant: ZTenantId,
|
||||
}
|
||||
|
||||
impl Job for GcJob {
|
||||
type ErrorType = anyhow::Error;
|
||||
|
||||
fn run(&self) -> Result<(), Self::ErrorType> {
|
||||
// TODO why not kill the chore when tenant is not active?
|
||||
if !matches!(tenant_mgr::get_tenant_state(self.tenant), Some(TenantState::Active(_, _))) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let repo = tenant_mgr::get_repository_for_tenant(self.tenant)?;
|
||||
let gc_horizon = repo.get_gc_horizon();
|
||||
if gc_horizon > 0 {
|
||||
repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub static GC_SCHEDULER: OnceCell<SimpleScheduler<GcJob>> = OnceCell::new();
|
||||
5
pageserver/src/jobs/mod.rs
Normal file
5
pageserver/src/jobs/mod.rs
Normal file
@@ -0,0 +1,5 @@
|
||||
pub mod scheduler;
|
||||
pub mod worker;
|
||||
pub mod chore;
|
||||
pub mod gc;
|
||||
pub mod compaction;
|
||||
240
pageserver/src/jobs/scheduler.rs
Normal file
240
pageserver/src/jobs/scheduler.rs
Normal file
@@ -0,0 +1,240 @@
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::ops::Add;
|
||||
use tokio::time::{Duration, Instant};
|
||||
|
||||
use crate::thread_mgr::shutdown_watcher;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::mpsc::{Receiver, Sender, channel};
|
||||
use tokio::time::{sleep, sleep_until};
|
||||
|
||||
use super::worker::{Job, Worker, Report};
|
||||
|
||||
pub struct Spawner<J: Job> {
|
||||
send_worker: Sender<Worker<J>>,
|
||||
send_report: Sender<Report<J>>,
|
||||
}
|
||||
|
||||
impl<J: Job> Spawner<J> {
|
||||
pub fn spawn_worker(&self) {
|
||||
use crate::{jobs::worker::run_worker, thread_mgr::{self, ThreadKind}};
|
||||
|
||||
let enlist = self.send_worker.clone();
|
||||
let report = self.send_report.clone();
|
||||
thread_mgr::spawn(
|
||||
ThreadKind::GcWorker,
|
||||
None,
|
||||
None,
|
||||
"gc_worker_1",
|
||||
true,
|
||||
move || {
|
||||
run_worker(enlist, report)
|
||||
},
|
||||
).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Status {
|
||||
Scheduled,
|
||||
Running,
|
||||
Stuck,
|
||||
}
|
||||
|
||||
pub struct Scheduler<J: Job> {
|
||||
send_work: Sender<J>,
|
||||
recv_report: Receiver<Report<J>>,
|
||||
|
||||
pub period: Duration,
|
||||
pub chores: Mutex<HashMap<J, Status>>,
|
||||
}
|
||||
|
||||
impl<J: Job> Scheduler<J> {
|
||||
pub async fn handle_report(&mut self, report: Report<J>) {
|
||||
let job = report.for_job;
|
||||
match report.result {
|
||||
Ok(()) => {
|
||||
// Reschedule job to run again
|
||||
if let Some(status) = self.chores.lock().await.get_mut(&job) {
|
||||
*status = Status::Scheduled;
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
// Remember error that got job stuck
|
||||
println!("task panicked");
|
||||
if let Some(status) = self.chores.lock().await.get_mut(&job) {
|
||||
*status = Status::Stuck;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn run(mut self) -> anyhow::Result<()> {
|
||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()?;
|
||||
|
||||
runtime.block_on(async {
|
||||
let mut next_iteration = Instant::now();
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = shutdown_watcher() => break,
|
||||
_ = sleep_until(next_iteration) => {
|
||||
next_iteration = Instant::now().add(self.period);
|
||||
for (job, status) in self.chores.lock().await.iter_mut() {
|
||||
if matches!(status, Status::Scheduled) {
|
||||
self.send_work.send(job.clone()).await.unwrap();
|
||||
*status = Status::Running;
|
||||
}
|
||||
}
|
||||
}
|
||||
report = self.recv_report.recv() => {
|
||||
let report = report.expect("report channel closed");
|
||||
self.handle_report(report).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Board<J: Job> {
|
||||
workers: Vec<Worker<J>>,
|
||||
jobs: VecDeque<J>,
|
||||
recv_work: Receiver<J>,
|
||||
recv_worker: Receiver<Worker<J>>,
|
||||
}
|
||||
|
||||
impl<J: Job> Board<J> {
|
||||
pub fn new() -> (Board<J>, Spawner<J>, Scheduler<J>) {
|
||||
let worker = channel::<Worker<J>>(100);
|
||||
let work = channel::<J>(100);
|
||||
let report = channel::<Report<J>>(100);
|
||||
|
||||
let board = Board {
|
||||
workers: vec![],
|
||||
jobs: VecDeque::new(),
|
||||
recv_worker: worker.1,
|
||||
recv_work: work.1,
|
||||
};
|
||||
let spawner = Spawner {
|
||||
send_worker: worker.0,
|
||||
send_report: report.0,
|
||||
};
|
||||
let scheduler = Scheduler {
|
||||
send_work: work.0,
|
||||
recv_report: report.1,
|
||||
period: Duration::from_millis(10),
|
||||
chores: Mutex::new(HashMap::new()),
|
||||
};
|
||||
|
||||
(board, spawner, scheduler)
|
||||
}
|
||||
|
||||
|
||||
pub async fn handle_job(&mut self, job: J) {
|
||||
// Assign to a worker if any are availabe
|
||||
while let Some(w) = self.workers.pop() {
|
||||
if let Ok(()) = w.0.send(job.clone()).await {
|
||||
return;
|
||||
}
|
||||
}
|
||||
self.jobs.push_back(job);
|
||||
}
|
||||
|
||||
pub async fn handle_worker(&mut self, worker: Worker<J>) {
|
||||
// Assign jobs if any are queued
|
||||
if let Some(j) = self.jobs.pop_front() {
|
||||
worker.0.send(j).await.ok();
|
||||
} else {
|
||||
self.workers.push(worker);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn run(mut self) -> anyhow::Result<()> {
|
||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()?;
|
||||
|
||||
runtime.block_on(async {
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = shutdown_watcher() => break,
|
||||
worker = self.recv_worker.recv() => {
|
||||
let worker = worker.expect("worker channel closed");
|
||||
self.handle_worker(worker).await;
|
||||
},
|
||||
job = self.recv_work.recv() => {
|
||||
let job = job.expect("job channel closed");
|
||||
self.handle_job(job).await;
|
||||
},
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::thread_mgr::{self, ThreadKind};
|
||||
use super::*;
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
|
||||
struct PrintJob {
|
||||
to_print: String
|
||||
}
|
||||
|
||||
impl Job for PrintJob {
|
||||
fn run(&self) {
|
||||
println!("{}", self.to_print);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sched_1() {
|
||||
let (board, spawner, scheduler) = Board::<PrintJob>::new();
|
||||
|
||||
// Schedule recurring job
|
||||
let j = PrintJob {
|
||||
to_print: "hello from job".to_string(),
|
||||
};
|
||||
scheduler.chores.lock().await.insert(j, Status::Scheduled);
|
||||
|
||||
// Spawn board
|
||||
thread_mgr::spawn(
|
||||
ThreadKind::GcScheduler,
|
||||
None,
|
||||
None,
|
||||
"gc_scheduler",
|
||||
true,
|
||||
move || {
|
||||
board.run()
|
||||
},
|
||||
).unwrap();
|
||||
|
||||
// Spawn scheduler
|
||||
thread_mgr::spawn(
|
||||
ThreadKind::GcScheduler,
|
||||
None,
|
||||
None,
|
||||
"gc_scheduler",
|
||||
true,
|
||||
move || {
|
||||
scheduler.run()
|
||||
},
|
||||
).unwrap();
|
||||
|
||||
// Spawn worker
|
||||
spawner.spawn_worker();
|
||||
|
||||
// Wait for job to run a few times
|
||||
sleep(Duration::from_millis(100)).await;
|
||||
|
||||
// Cleanup
|
||||
thread_mgr::shutdown_threads(None, None, None);
|
||||
}
|
||||
}
|
||||
108
pageserver/src/jobs/worker.rs
Normal file
108
pageserver/src/jobs/worker.rs
Normal file
@@ -0,0 +1,108 @@
|
||||
//!
|
||||
//! Worker thread that can be used in a thread pool to process jobs.
|
||||
//!
|
||||
use crate::thread_mgr::shutdown_watcher;
|
||||
use tokio::sync::mpsc::{Sender, channel};
|
||||
use std::any::Any;
|
||||
use std::hash::Hash;
|
||||
use std::panic::AssertUnwindSafe;
|
||||
use std::panic::catch_unwind;
|
||||
|
||||
pub trait Job: std::fmt::Debug + Send + 'static + Clone + PartialEq + Eq + Hash {
|
||||
fn run(&self);
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Worker<J: Job>(pub Sender<J>);
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Report<J: Job> {
|
||||
pub for_job: J,
|
||||
pub result: Result<(), Box<dyn Any + Send>>
|
||||
}
|
||||
|
||||
pub fn run_worker<J: Job>(enlist: Sender<Worker<J>>, report: Sender<Report<J>>) -> anyhow::Result<()> {
|
||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()?;
|
||||
|
||||
runtime.block_on(async {
|
||||
loop {
|
||||
let (send_work, mut get_work) = channel::<J>(100);
|
||||
enlist.send(Worker(send_work)).await.unwrap();
|
||||
|
||||
let shutdown_watcher = shutdown_watcher();
|
||||
tokio::select! {
|
||||
_ = shutdown_watcher => break,
|
||||
j = get_work.recv() => {
|
||||
if let Some(job) = j {
|
||||
let result = catch_unwind(AssertUnwindSafe(|| {
|
||||
job.run();
|
||||
}));
|
||||
report.send(Report {
|
||||
for_job: job,
|
||||
result: result,
|
||||
}).await.unwrap();
|
||||
} else {
|
||||
// channel closed
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::thread_mgr::{self, ThreadKind};
|
||||
use super::*;
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
|
||||
struct PrintJob {
|
||||
to_print: String
|
||||
}
|
||||
|
||||
impl Job for PrintJob {
|
||||
fn run(&self) {
|
||||
println!("{}", self.to_print);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn worker_1() {
|
||||
let mut worker = channel::<Worker<PrintJob>>(100);
|
||||
let mut result = channel::<Report<PrintJob>>(100);
|
||||
|
||||
thread_mgr::spawn(
|
||||
ThreadKind::GcWorker,
|
||||
None,
|
||||
None,
|
||||
"gc_worker_1",
|
||||
true,
|
||||
move || {
|
||||
run_worker(worker.0, result.0)
|
||||
},
|
||||
).unwrap();
|
||||
|
||||
let j = PrintJob {
|
||||
to_print: "hello from job".to_string(),
|
||||
};
|
||||
let w = worker.1.recv().await.unwrap();
|
||||
w.0.send(j.clone()).await.unwrap();
|
||||
|
||||
println!("waiting for result");
|
||||
let report = result.1.recv().await.unwrap();
|
||||
assert_eq!(j, report.for_job);
|
||||
println!("got result");
|
||||
|
||||
thread_mgr::shutdown_threads(None, None, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn worker_cancellation() {
|
||||
}
|
||||
}
|
||||
@@ -13,7 +13,6 @@ pub mod repository;
|
||||
pub mod storage_sync;
|
||||
pub mod tenant_config;
|
||||
pub mod tenant_mgr;
|
||||
pub mod tenant_threads;
|
||||
pub mod thread_mgr;
|
||||
pub mod timelines;
|
||||
pub mod virtual_file;
|
||||
@@ -21,6 +20,7 @@ pub mod walingest;
|
||||
pub mod walreceiver;
|
||||
pub mod walrecord;
|
||||
pub mod walredo;
|
||||
pub mod jobs;
|
||||
|
||||
use lazy_static::lazy_static;
|
||||
use tracing::info;
|
||||
|
||||
@@ -2,6 +2,9 @@
|
||||
//! page server.
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::jobs::chore::{Chore, ChoreHandle, Schedule, Scheduler};
|
||||
use crate::jobs::compaction::{COMPACTION_SCHEDULER, CompactionJob};
|
||||
use crate::jobs::gc::{GC_SCHEDULER, GcJob};
|
||||
use crate::layered_repository::LayeredRepository;
|
||||
use crate::pgdatadir_mapping::DatadirTimeline;
|
||||
use crate::repository::{Repository, TimelineSyncStatusUpdate};
|
||||
@@ -21,6 +24,7 @@ use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tracing::*;
|
||||
|
||||
use utils::zid::{ZTenantId, ZTimelineId};
|
||||
@@ -71,7 +75,7 @@ pub enum TenantState {
|
||||
//Ready,
|
||||
// This tenant exists on local disk, and the layer map has been loaded into memory.
|
||||
// The local disk might have some newer files that don't exist in cloud storage yet.
|
||||
Active,
|
||||
Active(ChoreHandle<GcJob>, ChoreHandle<CompactionJob>),
|
||||
// Tenant is active, but there is no walreceiver connection.
|
||||
Idle,
|
||||
// This tenant exists on local disk, and the layer map has been loaded into memory.
|
||||
@@ -83,7 +87,7 @@ pub enum TenantState {
|
||||
impl fmt::Display for TenantState {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
TenantState::Active => f.write_str("Active"),
|
||||
TenantState::Active(_, _) => f.write_str("Active"),
|
||||
TenantState::Idle => f.write_str("Idle"),
|
||||
TenantState::Stopping => f.write_str("Stopping"),
|
||||
}
|
||||
@@ -236,35 +240,26 @@ pub fn activate_tenant(tenant_id: ZTenantId) -> anyhow::Result<()> {
|
||||
|
||||
match tenant.state {
|
||||
// If the tenant is already active, nothing to do.
|
||||
TenantState::Active => {}
|
||||
TenantState::Active(_, _) => {}
|
||||
|
||||
// If it's Idle, launch the compactor and GC threads
|
||||
TenantState::Idle => {
|
||||
thread_mgr::spawn(
|
||||
ThreadKind::Compactor,
|
||||
Some(tenant_id),
|
||||
None,
|
||||
"Compactor thread",
|
||||
false,
|
||||
move || crate::tenant_threads::compact_loop(tenant_id),
|
||||
)?;
|
||||
let repo = crate::tenant_mgr::get_repository_for_tenant(tenant_id)?;
|
||||
|
||||
let gc_spawn_result = thread_mgr::spawn(
|
||||
ThreadKind::GarbageCollector,
|
||||
Some(tenant_id),
|
||||
None,
|
||||
"GC thread",
|
||||
false,
|
||||
move || crate::tenant_threads::gc_loop(tenant_id),
|
||||
)
|
||||
.with_context(|| format!("Failed to launch GC thread for tenant {tenant_id}"));
|
||||
let compaction_chore_handle = COMPACTION_SCHEDULER.get().unwrap().add_chore(Chore {
|
||||
job: CompactionJob {
|
||||
tenant: tenant_id,
|
||||
},
|
||||
schedule: Schedule::Every(repo.get_compaction_period()),
|
||||
});
|
||||
|
||||
if let Err(e) = &gc_spawn_result {
|
||||
error!("Failed to start GC thread for tenant {tenant_id}, stopping its checkpointer thread: {e:?}");
|
||||
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
|
||||
return gc_spawn_result;
|
||||
}
|
||||
tenant.state = TenantState::Active;
|
||||
let gc_chore_handle = GC_SCHEDULER.get().unwrap().add_chore(Chore {
|
||||
job: GcJob {
|
||||
tenant: tenant_id,
|
||||
},
|
||||
schedule: Schedule::Every(repo.get_gc_period()),
|
||||
});
|
||||
tenant.state = TenantState::Active(gc_chore_handle, compaction_chore_handle);
|
||||
}
|
||||
|
||||
TenantState::Stopping => {
|
||||
|
||||
@@ -1,79 +0,0 @@
|
||||
//! This module contains functions to serve per-tenant background processes,
|
||||
//! such as compaction and GC
|
||||
use crate::repository::Repository;
|
||||
use crate::tenant_mgr;
|
||||
use crate::tenant_mgr::TenantState;
|
||||
use anyhow::Result;
|
||||
use std::time::Duration;
|
||||
use tracing::*;
|
||||
use utils::zid::ZTenantId;
|
||||
|
||||
///
|
||||
/// Compaction thread's main loop
|
||||
///
|
||||
pub fn compact_loop(tenantid: ZTenantId) -> Result<()> {
|
||||
if let Err(err) = compact_loop_ext(tenantid) {
|
||||
error!("compact loop terminated with error: {:?}", err);
|
||||
Err(err)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> {
|
||||
loop {
|
||||
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
|
||||
break;
|
||||
}
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
let compaction_period = repo.get_compaction_period();
|
||||
|
||||
std::thread::sleep(compaction_period);
|
||||
trace!("compaction thread for tenant {} waking up", tenantid);
|
||||
|
||||
// Compact timelines
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
repo.compaction_iteration()?;
|
||||
}
|
||||
|
||||
trace!(
|
||||
"compaction thread stopped for tenant {} state is {:?}",
|
||||
tenantid,
|
||||
tenant_mgr::get_tenant_state(tenantid)
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// GC thread's main loop
|
||||
///
|
||||
pub fn gc_loop(tenantid: ZTenantId) -> Result<()> {
|
||||
loop {
|
||||
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
|
||||
break;
|
||||
}
|
||||
|
||||
trace!("gc thread for tenant {} waking up", tenantid);
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
let gc_horizon = repo.get_gc_horizon();
|
||||
// Garbage collect old files that are not needed for PITR anymore
|
||||
if gc_horizon > 0 {
|
||||
repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?;
|
||||
}
|
||||
|
||||
// TODO Write it in more adequate way using
|
||||
// condvar.wait_timeout() or something
|
||||
let mut sleep_time = repo.get_gc_period().as_secs();
|
||||
while sleep_time > 0 && tenant_mgr::get_tenant_state(tenantid) == Some(TenantState::Active)
|
||||
{
|
||||
sleep_time -= 1;
|
||||
std::thread::sleep(Duration::from_secs(1));
|
||||
}
|
||||
}
|
||||
trace!(
|
||||
"GC thread stopped for tenant {} state is {:?}",
|
||||
tenantid,
|
||||
tenant_mgr::get_tenant_state(tenantid)
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
@@ -97,6 +97,12 @@ pub enum ThreadKind {
|
||||
// Thread that handles compaction of all timelines for a tenant.
|
||||
Compactor,
|
||||
|
||||
// Thread that schedules GC tasks
|
||||
GcScheduler,
|
||||
|
||||
// Thread that works on GC tasks
|
||||
GcWorker,
|
||||
|
||||
// Thread that handles GC of a tenant
|
||||
GarbageCollector,
|
||||
|
||||
|
||||
Reference in New Issue
Block a user