mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-05 11:40:37 +00:00
Compare commits
9 Commits
layer_comp
...
ps-thread-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d84c4907a9 | ||
|
|
77c35906f5 | ||
|
|
b70cf3f7b1 | ||
|
|
304641afe5 | ||
|
|
13ed1e32b4 | ||
|
|
858ce6a8b5 | ||
|
|
491a1870ce | ||
|
|
30376d87ac | ||
|
|
887b4248d2 |
53
pageserver/src/jobs/chore.rs
Normal file
53
pageserver/src/jobs/chore.rs
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
use std::{marker::PhantomData, ops::Range, time::{Duration, Instant}};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
pub trait Job: std::fmt::Debug + Send + Copy + Clone + PartialEq + Eq + 'static {
|
||||||
|
type ErrorType: AsRef<dyn std::error::Error + 'static>;
|
||||||
|
fn run(&self) -> Result<(), Self::ErrorType>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum Schedule {
|
||||||
|
Every(Duration),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A job that repeats on a schedule
|
||||||
|
pub struct Chore<J: Job> {
|
||||||
|
pub job: J,
|
||||||
|
pub schedule: Schedule,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize)]
|
||||||
|
pub struct ChoreHandle<J: Job> {
|
||||||
|
_marker: PhantomData<J>,
|
||||||
|
chore_id: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum Status<J: Job> {
|
||||||
|
Scheduled(Instant),
|
||||||
|
Error(J::ErrorType),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait Scheduler<J: Job> {
|
||||||
|
fn add_chore(&self, chore: Chore<J>) -> ChoreHandle<J>;
|
||||||
|
fn remove_chore(&self, ch: ChoreHandle<J>);
|
||||||
|
fn get_status(&self, ch: ChoreHandle<J>) -> Status<J>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct SimpleScheduler<J: Job> {
|
||||||
|
_marker: PhantomData<J>,
|
||||||
|
}
|
||||||
|
impl<J: Job> Scheduler<J> for SimpleScheduler<J> {
|
||||||
|
fn add_chore(&self, chore: Chore<J>) -> ChoreHandle<J> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove_chore(&self, ch: ChoreHandle<J>) {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_status(&self, ch: ChoreHandle<J>) -> Status<J> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
}
|
||||||
33
pageserver/src/jobs/compaction.rs
Normal file
33
pageserver/src/jobs/compaction.rs
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
use once_cell::sync::OnceCell;
|
||||||
|
use utils::zid::ZTenantId;
|
||||||
|
use tracing::*;
|
||||||
|
use crate::repository::Repository;
|
||||||
|
|
||||||
|
use crate::tenant_mgr::{self, TenantState};
|
||||||
|
|
||||||
|
use super::chore::{Job, SimpleScheduler};
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
|
pub struct CompactionJob {
|
||||||
|
pub tenant: ZTenantId,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Job for CompactionJob {
|
||||||
|
type ErrorType = anyhow::Error;
|
||||||
|
|
||||||
|
fn run(&self) -> Result<(), Self::ErrorType> {
|
||||||
|
// TODO why not kill the chore when tenant is not active?
|
||||||
|
// TODO GC has the same code too
|
||||||
|
if !matches!(tenant_mgr::get_tenant_state(self.tenant), Some(TenantState::Active(_, _))) {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let repo = tenant_mgr::get_repository_for_tenant(self.tenant)?;
|
||||||
|
repo.compaction_iteration()?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub static COMPACTION_SCHEDULER: OnceCell<SimpleScheduler<CompactionJob>> = OnceCell::new();
|
||||||
35
pageserver/src/jobs/gc.rs
Normal file
35
pageserver/src/jobs/gc.rs
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
use once_cell::sync::OnceCell;
|
||||||
|
use utils::zid::ZTenantId;
|
||||||
|
use tracing::*;
|
||||||
|
use crate::repository::Repository;
|
||||||
|
|
||||||
|
use crate::tenant_mgr::{self, TenantState};
|
||||||
|
|
||||||
|
use super::chore::{Job, SimpleScheduler};
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
|
pub struct GcJob {
|
||||||
|
pub tenant: ZTenantId,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Job for GcJob {
|
||||||
|
type ErrorType = anyhow::Error;
|
||||||
|
|
||||||
|
fn run(&self) -> Result<(), Self::ErrorType> {
|
||||||
|
// TODO why not kill the chore when tenant is not active?
|
||||||
|
if !matches!(tenant_mgr::get_tenant_state(self.tenant), Some(TenantState::Active(_, _))) {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let repo = tenant_mgr::get_repository_for_tenant(self.tenant)?;
|
||||||
|
let gc_horizon = repo.get_gc_horizon();
|
||||||
|
if gc_horizon > 0 {
|
||||||
|
repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub static GC_SCHEDULER: OnceCell<SimpleScheduler<GcJob>> = OnceCell::new();
|
||||||
5
pageserver/src/jobs/mod.rs
Normal file
5
pageserver/src/jobs/mod.rs
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
pub mod scheduler;
|
||||||
|
pub mod worker;
|
||||||
|
pub mod chore;
|
||||||
|
pub mod gc;
|
||||||
|
pub mod compaction;
|
||||||
240
pageserver/src/jobs/scheduler.rs
Normal file
240
pageserver/src/jobs/scheduler.rs
Normal file
@@ -0,0 +1,240 @@
|
|||||||
|
use std::collections::{HashMap, VecDeque};
|
||||||
|
use std::ops::Add;
|
||||||
|
use tokio::time::{Duration, Instant};
|
||||||
|
|
||||||
|
use crate::thread_mgr::shutdown_watcher;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
use tokio::sync::mpsc::{Receiver, Sender, channel};
|
||||||
|
use tokio::time::{sleep, sleep_until};
|
||||||
|
|
||||||
|
use super::worker::{Job, Worker, Report};
|
||||||
|
|
||||||
|
pub struct Spawner<J: Job> {
|
||||||
|
send_worker: Sender<Worker<J>>,
|
||||||
|
send_report: Sender<Report<J>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<J: Job> Spawner<J> {
|
||||||
|
pub fn spawn_worker(&self) {
|
||||||
|
use crate::{jobs::worker::run_worker, thread_mgr::{self, ThreadKind}};
|
||||||
|
|
||||||
|
let enlist = self.send_worker.clone();
|
||||||
|
let report = self.send_report.clone();
|
||||||
|
thread_mgr::spawn(
|
||||||
|
ThreadKind::GcWorker,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
"gc_worker_1",
|
||||||
|
true,
|
||||||
|
move || {
|
||||||
|
run_worker(enlist, report)
|
||||||
|
},
|
||||||
|
).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum Status {
|
||||||
|
Scheduled,
|
||||||
|
Running,
|
||||||
|
Stuck,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Scheduler<J: Job> {
|
||||||
|
send_work: Sender<J>,
|
||||||
|
recv_report: Receiver<Report<J>>,
|
||||||
|
|
||||||
|
pub period: Duration,
|
||||||
|
pub chores: Mutex<HashMap<J, Status>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<J: Job> Scheduler<J> {
|
||||||
|
pub async fn handle_report(&mut self, report: Report<J>) {
|
||||||
|
let job = report.for_job;
|
||||||
|
match report.result {
|
||||||
|
Ok(()) => {
|
||||||
|
// Reschedule job to run again
|
||||||
|
if let Some(status) = self.chores.lock().await.get_mut(&job) {
|
||||||
|
*status = Status::Scheduled;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
// Remember error that got job stuck
|
||||||
|
println!("task panicked");
|
||||||
|
if let Some(status) = self.chores.lock().await.get_mut(&job) {
|
||||||
|
*status = Status::Stuck;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn run(mut self) -> anyhow::Result<()> {
|
||||||
|
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||||
|
.enable_all()
|
||||||
|
.build()?;
|
||||||
|
|
||||||
|
runtime.block_on(async {
|
||||||
|
let mut next_iteration = Instant::now();
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = shutdown_watcher() => break,
|
||||||
|
_ = sleep_until(next_iteration) => {
|
||||||
|
next_iteration = Instant::now().add(self.period);
|
||||||
|
for (job, status) in self.chores.lock().await.iter_mut() {
|
||||||
|
if matches!(status, Status::Scheduled) {
|
||||||
|
self.send_work.send(job.clone()).await.unwrap();
|
||||||
|
*status = Status::Running;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
report = self.recv_report.recv() => {
|
||||||
|
let report = report.expect("report channel closed");
|
||||||
|
self.handle_report(report).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Board<J: Job> {
|
||||||
|
workers: Vec<Worker<J>>,
|
||||||
|
jobs: VecDeque<J>,
|
||||||
|
recv_work: Receiver<J>,
|
||||||
|
recv_worker: Receiver<Worker<J>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<J: Job> Board<J> {
|
||||||
|
pub fn new() -> (Board<J>, Spawner<J>, Scheduler<J>) {
|
||||||
|
let worker = channel::<Worker<J>>(100);
|
||||||
|
let work = channel::<J>(100);
|
||||||
|
let report = channel::<Report<J>>(100);
|
||||||
|
|
||||||
|
let board = Board {
|
||||||
|
workers: vec![],
|
||||||
|
jobs: VecDeque::new(),
|
||||||
|
recv_worker: worker.1,
|
||||||
|
recv_work: work.1,
|
||||||
|
};
|
||||||
|
let spawner = Spawner {
|
||||||
|
send_worker: worker.0,
|
||||||
|
send_report: report.0,
|
||||||
|
};
|
||||||
|
let scheduler = Scheduler {
|
||||||
|
send_work: work.0,
|
||||||
|
recv_report: report.1,
|
||||||
|
period: Duration::from_millis(10),
|
||||||
|
chores: Mutex::new(HashMap::new()),
|
||||||
|
};
|
||||||
|
|
||||||
|
(board, spawner, scheduler)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub async fn handle_job(&mut self, job: J) {
|
||||||
|
// Assign to a worker if any are availabe
|
||||||
|
while let Some(w) = self.workers.pop() {
|
||||||
|
if let Ok(()) = w.0.send(job.clone()).await {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.jobs.push_back(job);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn handle_worker(&mut self, worker: Worker<J>) {
|
||||||
|
// Assign jobs if any are queued
|
||||||
|
if let Some(j) = self.jobs.pop_front() {
|
||||||
|
worker.0.send(j).await.ok();
|
||||||
|
} else {
|
||||||
|
self.workers.push(worker);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn run(mut self) -> anyhow::Result<()> {
|
||||||
|
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||||
|
.enable_all()
|
||||||
|
.build()?;
|
||||||
|
|
||||||
|
runtime.block_on(async {
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = shutdown_watcher() => break,
|
||||||
|
worker = self.recv_worker.recv() => {
|
||||||
|
let worker = worker.expect("worker channel closed");
|
||||||
|
self.handle_worker(worker).await;
|
||||||
|
},
|
||||||
|
job = self.recv_work.recv() => {
|
||||||
|
let job = job.expect("job channel closed");
|
||||||
|
self.handle_job(job).await;
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use crate::thread_mgr::{self, ThreadKind};
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
|
||||||
|
struct PrintJob {
|
||||||
|
to_print: String
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Job for PrintJob {
|
||||||
|
fn run(&self) {
|
||||||
|
println!("{}", self.to_print);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn sched_1() {
|
||||||
|
let (board, spawner, scheduler) = Board::<PrintJob>::new();
|
||||||
|
|
||||||
|
// Schedule recurring job
|
||||||
|
let j = PrintJob {
|
||||||
|
to_print: "hello from job".to_string(),
|
||||||
|
};
|
||||||
|
scheduler.chores.lock().await.insert(j, Status::Scheduled);
|
||||||
|
|
||||||
|
// Spawn board
|
||||||
|
thread_mgr::spawn(
|
||||||
|
ThreadKind::GcScheduler,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
"gc_scheduler",
|
||||||
|
true,
|
||||||
|
move || {
|
||||||
|
board.run()
|
||||||
|
},
|
||||||
|
).unwrap();
|
||||||
|
|
||||||
|
// Spawn scheduler
|
||||||
|
thread_mgr::spawn(
|
||||||
|
ThreadKind::GcScheduler,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
"gc_scheduler",
|
||||||
|
true,
|
||||||
|
move || {
|
||||||
|
scheduler.run()
|
||||||
|
},
|
||||||
|
).unwrap();
|
||||||
|
|
||||||
|
// Spawn worker
|
||||||
|
spawner.spawn_worker();
|
||||||
|
|
||||||
|
// Wait for job to run a few times
|
||||||
|
sleep(Duration::from_millis(100)).await;
|
||||||
|
|
||||||
|
// Cleanup
|
||||||
|
thread_mgr::shutdown_threads(None, None, None);
|
||||||
|
}
|
||||||
|
}
|
||||||
108
pageserver/src/jobs/worker.rs
Normal file
108
pageserver/src/jobs/worker.rs
Normal file
@@ -0,0 +1,108 @@
|
|||||||
|
//!
|
||||||
|
//! Worker thread that can be used in a thread pool to process jobs.
|
||||||
|
//!
|
||||||
|
use crate::thread_mgr::shutdown_watcher;
|
||||||
|
use tokio::sync::mpsc::{Sender, channel};
|
||||||
|
use std::any::Any;
|
||||||
|
use std::hash::Hash;
|
||||||
|
use std::panic::AssertUnwindSafe;
|
||||||
|
use std::panic::catch_unwind;
|
||||||
|
|
||||||
|
pub trait Job: std::fmt::Debug + Send + 'static + Clone + PartialEq + Eq + Hash {
|
||||||
|
fn run(&self);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Worker<J: Job>(pub Sender<J>);
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Report<J: Job> {
|
||||||
|
pub for_job: J,
|
||||||
|
pub result: Result<(), Box<dyn Any + Send>>
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn run_worker<J: Job>(enlist: Sender<Worker<J>>, report: Sender<Report<J>>) -> anyhow::Result<()> {
|
||||||
|
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||||
|
.enable_all()
|
||||||
|
.build()?;
|
||||||
|
|
||||||
|
runtime.block_on(async {
|
||||||
|
loop {
|
||||||
|
let (send_work, mut get_work) = channel::<J>(100);
|
||||||
|
enlist.send(Worker(send_work)).await.unwrap();
|
||||||
|
|
||||||
|
let shutdown_watcher = shutdown_watcher();
|
||||||
|
tokio::select! {
|
||||||
|
_ = shutdown_watcher => break,
|
||||||
|
j = get_work.recv() => {
|
||||||
|
if let Some(job) = j {
|
||||||
|
let result = catch_unwind(AssertUnwindSafe(|| {
|
||||||
|
job.run();
|
||||||
|
}));
|
||||||
|
report.send(Report {
|
||||||
|
for_job: job,
|
||||||
|
result: result,
|
||||||
|
}).await.unwrap();
|
||||||
|
} else {
|
||||||
|
// channel closed
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use crate::thread_mgr::{self, ThreadKind};
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
|
||||||
|
struct PrintJob {
|
||||||
|
to_print: String
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Job for PrintJob {
|
||||||
|
fn run(&self) {
|
||||||
|
println!("{}", self.to_print);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn worker_1() {
|
||||||
|
let mut worker = channel::<Worker<PrintJob>>(100);
|
||||||
|
let mut result = channel::<Report<PrintJob>>(100);
|
||||||
|
|
||||||
|
thread_mgr::spawn(
|
||||||
|
ThreadKind::GcWorker,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
"gc_worker_1",
|
||||||
|
true,
|
||||||
|
move || {
|
||||||
|
run_worker(worker.0, result.0)
|
||||||
|
},
|
||||||
|
).unwrap();
|
||||||
|
|
||||||
|
let j = PrintJob {
|
||||||
|
to_print: "hello from job".to_string(),
|
||||||
|
};
|
||||||
|
let w = worker.1.recv().await.unwrap();
|
||||||
|
w.0.send(j.clone()).await.unwrap();
|
||||||
|
|
||||||
|
println!("waiting for result");
|
||||||
|
let report = result.1.recv().await.unwrap();
|
||||||
|
assert_eq!(j, report.for_job);
|
||||||
|
println!("got result");
|
||||||
|
|
||||||
|
thread_mgr::shutdown_threads(None, None, None);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn worker_cancellation() {
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -13,7 +13,6 @@ pub mod repository;
|
|||||||
pub mod storage_sync;
|
pub mod storage_sync;
|
||||||
pub mod tenant_config;
|
pub mod tenant_config;
|
||||||
pub mod tenant_mgr;
|
pub mod tenant_mgr;
|
||||||
pub mod tenant_threads;
|
|
||||||
pub mod thread_mgr;
|
pub mod thread_mgr;
|
||||||
pub mod timelines;
|
pub mod timelines;
|
||||||
pub mod virtual_file;
|
pub mod virtual_file;
|
||||||
@@ -21,6 +20,7 @@ pub mod walingest;
|
|||||||
pub mod walreceiver;
|
pub mod walreceiver;
|
||||||
pub mod walrecord;
|
pub mod walrecord;
|
||||||
pub mod walredo;
|
pub mod walredo;
|
||||||
|
pub mod jobs;
|
||||||
|
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
|
|||||||
@@ -2,6 +2,9 @@
|
|||||||
//! page server.
|
//! page server.
|
||||||
|
|
||||||
use crate::config::PageServerConf;
|
use crate::config::PageServerConf;
|
||||||
|
use crate::jobs::chore::{Chore, ChoreHandle, Schedule, Scheduler};
|
||||||
|
use crate::jobs::compaction::{COMPACTION_SCHEDULER, CompactionJob};
|
||||||
|
use crate::jobs::gc::{GC_SCHEDULER, GcJob};
|
||||||
use crate::layered_repository::LayeredRepository;
|
use crate::layered_repository::LayeredRepository;
|
||||||
use crate::pgdatadir_mapping::DatadirTimeline;
|
use crate::pgdatadir_mapping::DatadirTimeline;
|
||||||
use crate::repository::{Repository, TimelineSyncStatusUpdate};
|
use crate::repository::{Repository, TimelineSyncStatusUpdate};
|
||||||
@@ -21,6 +24,7 @@ use std::collections::hash_map::Entry;
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
use tracing::*;
|
use tracing::*;
|
||||||
|
|
||||||
use utils::zid::{ZTenantId, ZTimelineId};
|
use utils::zid::{ZTenantId, ZTimelineId};
|
||||||
@@ -71,7 +75,7 @@ pub enum TenantState {
|
|||||||
//Ready,
|
//Ready,
|
||||||
// This tenant exists on local disk, and the layer map has been loaded into memory.
|
// This tenant exists on local disk, and the layer map has been loaded into memory.
|
||||||
// The local disk might have some newer files that don't exist in cloud storage yet.
|
// The local disk might have some newer files that don't exist in cloud storage yet.
|
||||||
Active,
|
Active(ChoreHandle<GcJob>, ChoreHandle<CompactionJob>),
|
||||||
// Tenant is active, but there is no walreceiver connection.
|
// Tenant is active, but there is no walreceiver connection.
|
||||||
Idle,
|
Idle,
|
||||||
// This tenant exists on local disk, and the layer map has been loaded into memory.
|
// This tenant exists on local disk, and the layer map has been loaded into memory.
|
||||||
@@ -83,7 +87,7 @@ pub enum TenantState {
|
|||||||
impl fmt::Display for TenantState {
|
impl fmt::Display for TenantState {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
match self {
|
match self {
|
||||||
TenantState::Active => f.write_str("Active"),
|
TenantState::Active(_, _) => f.write_str("Active"),
|
||||||
TenantState::Idle => f.write_str("Idle"),
|
TenantState::Idle => f.write_str("Idle"),
|
||||||
TenantState::Stopping => f.write_str("Stopping"),
|
TenantState::Stopping => f.write_str("Stopping"),
|
||||||
}
|
}
|
||||||
@@ -236,35 +240,26 @@ pub fn activate_tenant(tenant_id: ZTenantId) -> anyhow::Result<()> {
|
|||||||
|
|
||||||
match tenant.state {
|
match tenant.state {
|
||||||
// If the tenant is already active, nothing to do.
|
// If the tenant is already active, nothing to do.
|
||||||
TenantState::Active => {}
|
TenantState::Active(_, _) => {}
|
||||||
|
|
||||||
// If it's Idle, launch the compactor and GC threads
|
// If it's Idle, launch the compactor and GC threads
|
||||||
TenantState::Idle => {
|
TenantState::Idle => {
|
||||||
thread_mgr::spawn(
|
let repo = crate::tenant_mgr::get_repository_for_tenant(tenant_id)?;
|
||||||
ThreadKind::Compactor,
|
|
||||||
Some(tenant_id),
|
|
||||||
None,
|
|
||||||
"Compactor thread",
|
|
||||||
false,
|
|
||||||
move || crate::tenant_threads::compact_loop(tenant_id),
|
|
||||||
)?;
|
|
||||||
|
|
||||||
let gc_spawn_result = thread_mgr::spawn(
|
let compaction_chore_handle = COMPACTION_SCHEDULER.get().unwrap().add_chore(Chore {
|
||||||
ThreadKind::GarbageCollector,
|
job: CompactionJob {
|
||||||
Some(tenant_id),
|
tenant: tenant_id,
|
||||||
None,
|
},
|
||||||
"GC thread",
|
schedule: Schedule::Every(repo.get_compaction_period()),
|
||||||
false,
|
});
|
||||||
move || crate::tenant_threads::gc_loop(tenant_id),
|
|
||||||
)
|
|
||||||
.with_context(|| format!("Failed to launch GC thread for tenant {tenant_id}"));
|
|
||||||
|
|
||||||
if let Err(e) = &gc_spawn_result {
|
let gc_chore_handle = GC_SCHEDULER.get().unwrap().add_chore(Chore {
|
||||||
error!("Failed to start GC thread for tenant {tenant_id}, stopping its checkpointer thread: {e:?}");
|
job: GcJob {
|
||||||
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
|
tenant: tenant_id,
|
||||||
return gc_spawn_result;
|
},
|
||||||
}
|
schedule: Schedule::Every(repo.get_gc_period()),
|
||||||
tenant.state = TenantState::Active;
|
});
|
||||||
|
tenant.state = TenantState::Active(gc_chore_handle, compaction_chore_handle);
|
||||||
}
|
}
|
||||||
|
|
||||||
TenantState::Stopping => {
|
TenantState::Stopping => {
|
||||||
|
|||||||
@@ -1,79 +0,0 @@
|
|||||||
//! This module contains functions to serve per-tenant background processes,
|
|
||||||
//! such as compaction and GC
|
|
||||||
use crate::repository::Repository;
|
|
||||||
use crate::tenant_mgr;
|
|
||||||
use crate::tenant_mgr::TenantState;
|
|
||||||
use anyhow::Result;
|
|
||||||
use std::time::Duration;
|
|
||||||
use tracing::*;
|
|
||||||
use utils::zid::ZTenantId;
|
|
||||||
|
|
||||||
///
|
|
||||||
/// Compaction thread's main loop
|
|
||||||
///
|
|
||||||
pub fn compact_loop(tenantid: ZTenantId) -> Result<()> {
|
|
||||||
if let Err(err) = compact_loop_ext(tenantid) {
|
|
||||||
error!("compact loop terminated with error: {:?}", err);
|
|
||||||
Err(err)
|
|
||||||
} else {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> {
|
|
||||||
loop {
|
|
||||||
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
|
||||||
let compaction_period = repo.get_compaction_period();
|
|
||||||
|
|
||||||
std::thread::sleep(compaction_period);
|
|
||||||
trace!("compaction thread for tenant {} waking up", tenantid);
|
|
||||||
|
|
||||||
// Compact timelines
|
|
||||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
|
||||||
repo.compaction_iteration()?;
|
|
||||||
}
|
|
||||||
|
|
||||||
trace!(
|
|
||||||
"compaction thread stopped for tenant {} state is {:?}",
|
|
||||||
tenantid,
|
|
||||||
tenant_mgr::get_tenant_state(tenantid)
|
|
||||||
);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
///
|
|
||||||
/// GC thread's main loop
|
|
||||||
///
|
|
||||||
pub fn gc_loop(tenantid: ZTenantId) -> Result<()> {
|
|
||||||
loop {
|
|
||||||
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
trace!("gc thread for tenant {} waking up", tenantid);
|
|
||||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
|
||||||
let gc_horizon = repo.get_gc_horizon();
|
|
||||||
// Garbage collect old files that are not needed for PITR anymore
|
|
||||||
if gc_horizon > 0 {
|
|
||||||
repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO Write it in more adequate way using
|
|
||||||
// condvar.wait_timeout() or something
|
|
||||||
let mut sleep_time = repo.get_gc_period().as_secs();
|
|
||||||
while sleep_time > 0 && tenant_mgr::get_tenant_state(tenantid) == Some(TenantState::Active)
|
|
||||||
{
|
|
||||||
sleep_time -= 1;
|
|
||||||
std::thread::sleep(Duration::from_secs(1));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
trace!(
|
|
||||||
"GC thread stopped for tenant {} state is {:?}",
|
|
||||||
tenantid,
|
|
||||||
tenant_mgr::get_tenant_state(tenantid)
|
|
||||||
);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
@@ -97,6 +97,12 @@ pub enum ThreadKind {
|
|||||||
// Thread that handles compaction of all timelines for a tenant.
|
// Thread that handles compaction of all timelines for a tenant.
|
||||||
Compactor,
|
Compactor,
|
||||||
|
|
||||||
|
// Thread that schedules GC tasks
|
||||||
|
GcScheduler,
|
||||||
|
|
||||||
|
// Thread that works on GC tasks
|
||||||
|
GcWorker,
|
||||||
|
|
||||||
// Thread that handles GC of a tenant
|
// Thread that handles GC of a tenant
|
||||||
GarbageCollector,
|
GarbageCollector,
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user