This commit is contained in:
Bojan Serafimov
2022-05-24 21:54:55 -04:00
parent f9c1fc8657
commit 079871f8e3
8 changed files with 130 additions and 68 deletions

View File

@@ -9,7 +9,18 @@ use clap::{App, Arg};
use daemonize::Daemonize;
use fail::FailScenario;
use pageserver::{LOG_FILE_NAME, config::{defaults::*, PageServerConf}, http, page_cache, page_service, profiling, tenant_jobs::{compaction::{COMPACTION_POOL, CompactionJob}, gc::{GC_POOL, GcJob}, worker::Pool}, tenant_mgr, thread_mgr, thread_mgr::ThreadKind, timelines, virtual_file};
use pageserver::{
config::{defaults::*, PageServerConf},
http, page_cache, page_service, profiling,
tenant_jobs::{
compaction::{CompactionJob, COMPACTION_POOL},
gc::{GcJob, GC_POOL},
worker::Pool,
},
tenant_mgr, thread_mgr,
thread_mgr::ThreadKind,
timelines, virtual_file, LOG_FILE_NAME,
};
use utils::{
auth::JwtAuth,
http::endpoint,
@@ -310,10 +321,9 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
None,
&name.clone(),
true,
move || {
GC_POOL.get().unwrap().worker_main(name.clone())
},
).unwrap();
move || GC_POOL.get().unwrap().worker_main(name.clone()),
)
.unwrap();
}
// Spawn compaction workers
@@ -326,10 +336,9 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
None,
&name.clone(),
true,
move || {
GC_POOL.get().unwrap().worker_main(name.clone())
},
).unwrap();
move || GC_POOL.get().unwrap().worker_main(name.clone()),
)
.unwrap();
}
signals.handle(|signal| match signal {

View File

@@ -0,0 +1 @@

View File

@@ -12,6 +12,7 @@ pub mod reltag;
pub mod repository;
pub mod storage_sync;
pub mod tenant_config;
pub mod tenant_jobs;
pub mod tenant_mgr;
pub mod thread_mgr;
pub mod timelines;
@@ -20,7 +21,6 @@ pub mod walingest;
pub mod walreceiver;
pub mod walrecord;
pub mod walredo;
pub mod tenant_jobs;
use lazy_static::lazy_static;
use tracing::info;

View File

@@ -1,14 +1,13 @@
use std::{ops::Add, time::Instant};
use crate::repository::Repository;
use once_cell::sync::OnceCell;
use utils::zid::ZTenantId;
use crate::repository::Repository;
use crate::tenant_mgr::{self, TenantState};
use super::worker::{Job, Pool};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct CompactionJob {
pub tenant: ZTenantId,
@@ -19,7 +18,10 @@ impl Job for CompactionJob {
fn run(&self) -> Result<Option<Instant>, Self::ErrorType> {
// Don't reschedule job if tenant isn't active
if !matches!(tenant_mgr::get_tenant_state(self.tenant), Some(TenantState::Active)) {
if !matches!(
tenant_mgr::get_tenant_state(self.tenant),
Some(TenantState::Active)
) {
return Ok(None);
}

View File

@@ -1,8 +1,11 @@
use std::{ops::Add, time::Instant};
use crate::{
repository::Repository,
tenant_mgr::{self, TenantState},
};
use once_cell::sync::OnceCell;
use utils::zid::ZTenantId;
use crate::{repository::Repository, tenant_mgr::{self, TenantState}};
use super::worker::{Job, Pool};
@@ -16,7 +19,10 @@ impl Job for GcJob {
fn run(&self) -> Result<Option<Instant>, Self::ErrorType> {
// Don't reschedule job if tenant isn't active
if !matches!(tenant_mgr::get_tenant_state(self.tenant), Some(TenantState::Active)) {
if !matches!(
tenant_mgr::get_tenant_state(self.tenant),
Some(TenantState::Active)
) {
return Ok(None);
}

View File

@@ -1,3 +1,3 @@
pub mod worker;
pub mod compaction;
pub mod gc;
pub mod worker;

View File

@@ -1,6 +1,15 @@
use std::{any::Any, collections::{BinaryHeap, HashMap}, fmt::Debug, hash::Hash, ops::Add, panic::{self, AssertUnwindSafe}, sync::{Condvar, Mutex}, time::{Duration, Instant}};
use lazy_static::lazy_static;
use metrics::{register_gauge_vec, GaugeVec};
use std::{
any::Any,
collections::{BinaryHeap, HashMap},
fmt::Debug,
hash::Hash,
ops::Add,
panic::{self, AssertUnwindSafe},
sync::{Condvar, Mutex},
time::{Duration, Instant},
};
lazy_static! {
static ref POOL_UTILIZATION_GAUGE: GaugeVec = register_gauge_vec!(
@@ -23,7 +32,10 @@ enum JobError<J: Job> {
}
#[derive(Debug)]
enum JobStatus<J: Job> where J::ErrorType: Debug {
enum JobStatus<J: Job>
where
J::ErrorType: Debug,
{
Ready {
scheduled_for: Instant,
},
@@ -36,33 +48,48 @@ enum JobStatus<J: Job> where J::ErrorType: Debug {
// TODO make this generic event, put in different module
#[derive(Debug)]
struct Deadline<J: Job> where J::ErrorType: Debug {
struct Deadline<J: Job>
where
J::ErrorType: Debug,
{
start_by: Instant,
job: J,
}
impl<J: Job> PartialOrd for Deadline<J> where J::ErrorType: Debug {
impl<J: Job> PartialOrd for Deadline<J>
where
J::ErrorType: Debug,
{
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
other.start_by.partial_cmp(&self.start_by)
}
}
impl<J: Job> Ord for Deadline<J> where J::ErrorType: Debug {
impl<J: Job> Ord for Deadline<J>
where
J::ErrorType: Debug,
{
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other.start_by.cmp(&self.start_by)
}
}
impl<J: Job> PartialEq for Deadline<J> where J::ErrorType: Debug {
impl<J: Job> PartialEq for Deadline<J>
where
J::ErrorType: Debug,
{
fn eq(&self, other: &Self) -> bool {
self.start_by == other.start_by
}
}
impl<J: Job> Eq for Deadline<J> where J::ErrorType: Debug { }
impl<J: Job> Eq for Deadline<J> where J::ErrorType: Debug {}
#[derive(Debug)]
struct JobStatusTable<J: Job> where J::ErrorType: Debug {
struct JobStatusTable<J: Job>
where
J::ErrorType: Debug,
{
/// Complete summary of current state
status: HashMap<J, JobStatus<J>>,
@@ -70,7 +97,10 @@ struct JobStatusTable<J: Job> where J::ErrorType: Debug {
queue: BinaryHeap<Deadline<J>>,
}
impl<J: Job> JobStatusTable<J> where J::ErrorType: Debug {
impl<J: Job> JobStatusTable<J>
where
J::ErrorType: Debug,
{
fn pop_due(&mut self) -> Option<Deadline<J>> {
if let Some(deadline) = self.queue.peek() {
if Instant::now() > deadline.start_by {
@@ -87,12 +117,18 @@ impl<J: Job> JobStatusTable<J> where J::ErrorType: Debug {
}
#[derive(Debug)]
pub struct Pool<J: Job> where J::ErrorType: Debug {
pub struct Pool<J: Job>
where
J::ErrorType: Debug,
{
job_table: Mutex<JobStatusTable<J>>,
condvar: Condvar, // Notified when idle worker should wake up
condvar: Condvar, // Notified when idle worker should wake up
}
impl<J: Job> Pool<J> where J::ErrorType: Debug {
impl<J: Job> Pool<J>
where
J::ErrorType: Debug,
{
pub fn new() -> Self {
Pool {
job_table: Mutex::new(JobStatusTable::<J> {
@@ -107,43 +143,51 @@ impl<J: Job> Pool<J> where J::ErrorType: Debug {
pub fn worker_main(&self, worker_name: String) -> anyhow::Result<()> {
let mut job_table = self.job_table.lock().unwrap();
loop {
if let Some(Deadline {job, ..}) = job_table.pop_due() {
job_table.set_status(&job, JobStatus::Running {
worker_name: worker_name.clone(),
started_at: Instant::now(),
});
if let Some(Deadline { job, .. }) = job_table.pop_due() {
job_table.set_status(
&job,
JobStatus::Running {
worker_name: worker_name.clone(),
started_at: Instant::now(),
},
);
// Run job without holding lock
drop(job_table);
POOL_UTILIZATION_GAUGE.with_label_values(&["todo_put_pool_name_here"]).inc();
let result = panic::catch_unwind(AssertUnwindSafe(|| {
job.run()
}));
POOL_UTILIZATION_GAUGE.with_label_values(&["todo_put_pool_name_here"]).dec();
POOL_UTILIZATION_GAUGE
.with_label_values(&["todo_put_pool_name_here"])
.inc();
let result = panic::catch_unwind(AssertUnwindSafe(|| job.run()));
POOL_UTILIZATION_GAUGE
.with_label_values(&["todo_put_pool_name_here"])
.dec();
job_table = self.job_table.lock().unwrap();
// Update job status
match result {
Ok(Ok(Some(reschedule_for))) => {
job_table.set_status(&job, JobStatus::Ready {
scheduled_for: reschedule_for,
});
job_table.set_status(
&job,
JobStatus::Ready {
scheduled_for: reschedule_for,
},
);
job_table.queue.push(Deadline {
job: job.clone(),
start_by: reschedule_for,
});
},
}
Ok(Ok(None)) => {
job_table.status.remove(&job);
},
}
Ok(Err(e)) => {
job_table.set_status(&job, JobStatus::Stuck(JobError::Error(e)));
println!("Job errored, thread is ok.");
},
}
Err(e) => {
job_table.set_status(&job, JobStatus::Stuck(JobError::Panic(e)));
println!("Job panicked, thread is ok.");
},
}
}
} else {
match job_table.queue.peek() {
@@ -162,9 +206,9 @@ impl<J: Job> Pool<J> where J::ErrorType: Debug {
pub fn queue_job(&self, job: J) {
let mut job_table = self.job_table.lock().unwrap();
let scheduled_for = Instant::now();
job_table.status.insert(job.clone(), JobStatus::Ready {
scheduled_for,
});
job_table
.status
.insert(job.clone(), JobStatus::Ready { scheduled_for });
job_table.queue.push(Deadline {
job: job.clone(),
start_by: scheduled_for,
@@ -180,12 +224,12 @@ mod tests {
use once_cell::sync::OnceCell;
use crate::thread_mgr::{self, ThreadKind};
use super::*;
use crate::thread_mgr::{self, ThreadKind};
#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)]
struct PrintJob {
to_print: String
to_print: String,
}
impl Job for PrintJob {
@@ -207,26 +251,24 @@ mod tests {
TEST_POOL.set(Pool::<PrintJob>::new()).unwrap();
thread_mgr::spawn(
ThreadKind::GarbageCollector, // change this
ThreadKind::GarbageCollector, // change this
None,
None,
"test_worker_1",
true,
move || {
TEST_POOL.get().unwrap().worker_main("test_worker_1".into())
},
).unwrap();
move || TEST_POOL.get().unwrap().worker_main("test_worker_1".into()),
)
.unwrap();
thread_mgr::spawn(
ThreadKind::GarbageCollector, // change this
ThreadKind::GarbageCollector, // change this
None,
None,
"test_worker_2",
true,
move || {
TEST_POOL.get().unwrap().worker_main("test_worker_2".into())
},
).unwrap();
move || TEST_POOL.get().unwrap().worker_main("test_worker_2".into()),
)
.unwrap();
TEST_POOL.get().unwrap().queue_job(PrintJob {
to_print: "hello from job".to_string(),

View File

@@ -8,8 +8,8 @@ use crate::repository::{Repository, TimelineSyncStatusUpdate};
use crate::storage_sync::index::RemoteIndex;
use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData};
use crate::tenant_config::TenantConfOpt;
use crate::tenant_jobs::compaction::{COMPACTION_POOL, CompactionJob};
use crate::tenant_jobs::gc::{GC_POOL, GcJob};
use crate::tenant_jobs::compaction::{CompactionJob, COMPACTION_POOL};
use crate::tenant_jobs::gc::{GcJob, GC_POOL};
use crate::thread_mgr;
use crate::thread_mgr::ThreadKind;
use crate::timelines;
@@ -245,13 +245,15 @@ pub fn activate_tenant(tenant_id: ZTenantId) -> anyhow::Result<()> {
// Important to activate before scheduling jobs
tenant.state = TenantState::Active;
GC_POOL.get().unwrap().queue_job(GcJob {
tenant: tenant_id,
});
GC_POOL
.get()
.unwrap()
.queue_job(GcJob { tenant: tenant_id });
COMPACTION_POOL.get().unwrap().queue_job(CompactionJob {
tenant: tenant_id,
});
COMPACTION_POOL
.get()
.unwrap()
.queue_job(CompactionJob { tenant: tenant_id });
}
TenantState::Stopping => {