Add gc pool

This commit is contained in:
Bojan Serafimov
2022-05-23 23:42:25 -04:00
parent dc47f9ccf1
commit 0d5fee3ab7
4 changed files with 104 additions and 21 deletions

View File

@@ -0,0 +1,35 @@
use once_cell::sync::OnceCell;
use utils::zid::ZTenantId;
use crate::repository::Repository;
use crate::tenant_mgr::{self, TenantState};
use super::worker::{Job, Pool};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct CompactionJob {
pub tenant: ZTenantId,
}
impl Job for CompactionJob {
type ErrorType = anyhow::Error;
fn run(&self) -> Result<(), Self::ErrorType> {
// TODO GC has the same code too
if !matches!(tenant_mgr::get_tenant_state(self.tenant), Some(TenantState::Active)) {
// TODO Maybe record this as "didn't run"?
return Ok(());
}
let repo = tenant_mgr::get_repository_for_tenant(self.tenant)?;
repo.compaction_iteration()?;
Ok(())
}
}
pub static COMPACTION_SCHEDULER: OnceCell<Pool<CompactionJob>> = OnceCell::new();
// TODO init pool with compaction interval
// TODO spawn 20 worker threads

View File

@@ -0,0 +1,34 @@
use once_cell::sync::OnceCell;
use utils::zid::ZTenantId;
use crate::{repository::Repository, tenant_mgr::{self, TenantState}};
use super::worker::{Job, Pool};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct GcJob {
pub tenant: ZTenantId,
}
impl Job for GcJob {
type ErrorType = anyhow::Error;
fn run(&self) -> Result<(), Self::ErrorType> {
if !matches!(tenant_mgr::get_tenant_state(self.tenant), Some(TenantState::Active)) {
// TODO Maybe record this as "didn't run"?
return Ok(());
}
let repo = tenant_mgr::get_repository_for_tenant(self.tenant)?;
let gc_horizon = repo.get_gc_horizon();
if gc_horizon > 0 {
repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?;
}
Ok(())
}
}
pub static GC_POOL: OnceCell<Pool<GcJob>> = OnceCell::new();
// TODO init gc pool with gc interval
// TODO spawn 20 worker threads

View File

@@ -1 +1,3 @@
pub mod worker;
pub mod compaction;
pub mod gc;

View File

@@ -1,28 +1,33 @@
use std::{panic::{self, AssertUnwindSafe}, sync::{Condvar, Mutex}, time::{Duration, Instant}};
// TODO maybe make jobs tenant-specific? Makes monitorin easier.
use std::{any::Any, fmt::Debug, panic::{self, AssertUnwindSafe}, sync::{Condvar, Mutex}, time::{Duration, Instant}};
pub trait Job: std::fmt::Debug + Send + Clone + 'static {
fn run(&self);
}
#[derive(Debug, Clone)]
enum JobStatus {
Ready,
Running(Instant), // TODO add worker id
Stuck, // TODO remember error
type ErrorType;
fn run(&self) -> Result<(), Self::ErrorType>;
}
#[derive(Debug)]
struct JobStatusTable<J: Job> {
enum JobError<J: Job> {
Panic(Box<dyn Any + Send>),
Error(J::ErrorType),
}
#[derive(Debug)]
enum JobStatus<J: Job> where J::ErrorType: Debug {
Ready,
Running(Instant), // TODO add worker id
Stuck(JobError<J>),
}
#[derive(Debug)]
struct JobStatusTable<J: Job> where J::ErrorType: Debug {
// TODO this vec is no good. Too much index arithmetic.
jobs: Vec<(J, JobStatus)>,
jobs: Vec<(J, JobStatus<J>)>,
next: usize,
begin: Instant,
period: Duration,
}
impl<J: Job> JobStatusTable<J> {
impl<J: Job> JobStatusTable<J> where J::ErrorType: Debug {
fn next(&mut self) -> Option<(usize, J)> {
while self.next < self.jobs.len() {
let curr = self.next;
@@ -34,7 +39,7 @@ impl<J: Job> JobStatusTable<J> {
return Some((curr, self.jobs[curr].0.clone()))
}
JobStatus::Running(_) => println!("Job already running, skipping this round"),
JobStatus::Stuck => println!("Job stuck, skipping"),
JobStatus::Stuck(_) => println!("Job stuck, skipping"),
}
}
None
@@ -53,12 +58,12 @@ impl<J: Job> JobStatusTable<J> {
}
#[derive(Debug)]
struct Pool<J: Job> {
pub struct Pool<J: Job> where J::ErrorType: Debug {
job_table: Mutex<JobStatusTable<J>>,
condvar: Condvar, // Notified when idle worker should wake up
}
impl<J: Job> Pool<J> {
impl<J: Job> Pool<J> where J::ErrorType: Debug {
fn new() -> Self {
Pool {
job_table: Mutex::new(JobStatusTable::<J> {
@@ -78,17 +83,21 @@ impl<J: Job> Pool<J> {
// Run job without holding lock
drop(job_table);
let result = panic::catch_unwind(AssertUnwindSafe(|| {
job.run();
job.run()
}));
job_table = self.job_table.lock().unwrap();
// Update job status
match result {
Ok(()) => {
Ok(Ok(())) => {
job_table.jobs[id].1 = JobStatus::Ready;
},
Ok(Err(e)) => {
job_table.jobs[id].1 = JobStatus::Stuck(JobError::Error(e));
println!("Job errored, thread is ok.");
},
Err(e) => {
job_table.jobs[id].1 = JobStatus::Stuck;
job_table.jobs[id].1 = JobStatus::Stuck(JobError::Panic(e));
println!("Job panicked, thread is ok.");
},
}
@@ -127,11 +136,14 @@ mod tests {
}
impl Job for PrintJob {
fn run(&self) {
type ErrorType = String;
fn run(&self) -> Result<(), String> {
if self.to_print == "pls panic" {
panic!("AAA");
}
println!("{}", self.to_print);
Ok(())
}
}