Compare commits

...

9 Commits

Author SHA1 Message Date
Bojan Serafimov
d84c4907a9 Add fair scheduler process 2022-05-23 15:11:38 -04:00
Bojan Serafimov
77c35906f5 Revert "WIP"
This reverts commit b70cf3f7b1.
2022-05-23 14:07:38 -04:00
Bojan Serafimov
b70cf3f7b1 WIP 2022-05-23 10:55:36 -04:00
Bojan Serafimov
304641afe5 Refactor 2022-05-23 00:42:14 -04:00
Bojan Serafimov
13ed1e32b4 Handle panic 2022-05-22 16:51:12 -04:00
Bojan Serafimov
858ce6a8b5 WIP 2022-05-16 13:07:50 -04:00
Bojan Serafimov
491a1870ce Cleanup 2022-05-11 18:55:58 -04:00
Bojan Serafimov
30376d87ac Improve logic 2022-05-11 18:39:46 -04:00
Bojan Serafimov
887b4248d2 WIP working on simple scheduler 2022-05-11 12:57:45 -04:00
10 changed files with 502 additions and 106 deletions

View File

@@ -0,0 +1,53 @@
use std::{marker::PhantomData, ops::Range, time::{Duration, Instant}};
use serde::{Deserialize, Serialize};
pub trait Job: std::fmt::Debug + Send + Copy + Clone + PartialEq + Eq + 'static {
type ErrorType: AsRef<dyn std::error::Error + 'static>;
fn run(&self) -> Result<(), Self::ErrorType>;
}
pub enum Schedule {
Every(Duration),
}
/// A job that repeats on a schedule
pub struct Chore<J: Job> {
pub job: J,
pub schedule: Schedule,
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize)]
pub struct ChoreHandle<J: Job> {
_marker: PhantomData<J>,
chore_id: u64,
}
pub enum Status<J: Job> {
Scheduled(Instant),
Error(J::ErrorType),
}
pub trait Scheduler<J: Job> {
fn add_chore(&self, chore: Chore<J>) -> ChoreHandle<J>;
fn remove_chore(&self, ch: ChoreHandle<J>);
fn get_status(&self, ch: ChoreHandle<J>) -> Status<J>;
}
pub struct SimpleScheduler<J: Job> {
_marker: PhantomData<J>,
}
impl<J: Job> Scheduler<J> for SimpleScheduler<J> {
fn add_chore(&self, chore: Chore<J>) -> ChoreHandle<J> {
todo!()
}
fn remove_chore(&self, ch: ChoreHandle<J>) {
todo!()
}
fn get_status(&self, ch: ChoreHandle<J>) -> Status<J> {
todo!()
}
}

View File

@@ -0,0 +1,33 @@
use once_cell::sync::OnceCell;
use utils::zid::ZTenantId;
use tracing::*;
use crate::repository::Repository;
use crate::tenant_mgr::{self, TenantState};
use super::chore::{Job, SimpleScheduler};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct CompactionJob {
pub tenant: ZTenantId,
}
impl Job for CompactionJob {
type ErrorType = anyhow::Error;
fn run(&self) -> Result<(), Self::ErrorType> {
// TODO why not kill the chore when tenant is not active?
// TODO GC has the same code too
if !matches!(tenant_mgr::get_tenant_state(self.tenant), Some(TenantState::Active(_, _))) {
return Ok(());
}
let repo = tenant_mgr::get_repository_for_tenant(self.tenant)?;
repo.compaction_iteration()?;
Ok(())
}
}
pub static COMPACTION_SCHEDULER: OnceCell<SimpleScheduler<CompactionJob>> = OnceCell::new();

35
pageserver/src/jobs/gc.rs Normal file
View File

@@ -0,0 +1,35 @@
use once_cell::sync::OnceCell;
use utils::zid::ZTenantId;
use tracing::*;
use crate::repository::Repository;
use crate::tenant_mgr::{self, TenantState};
use super::chore::{Job, SimpleScheduler};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct GcJob {
pub tenant: ZTenantId,
}
impl Job for GcJob {
type ErrorType = anyhow::Error;
fn run(&self) -> Result<(), Self::ErrorType> {
// TODO why not kill the chore when tenant is not active?
if !matches!(tenant_mgr::get_tenant_state(self.tenant), Some(TenantState::Active(_, _))) {
return Ok(());
}
let repo = tenant_mgr::get_repository_for_tenant(self.tenant)?;
let gc_horizon = repo.get_gc_horizon();
if gc_horizon > 0 {
repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?;
}
Ok(())
}
}
pub static GC_SCHEDULER: OnceCell<SimpleScheduler<GcJob>> = OnceCell::new();

View File

@@ -0,0 +1,5 @@
pub mod scheduler;
pub mod worker;
pub mod chore;
pub mod gc;
pub mod compaction;

View File

@@ -0,0 +1,240 @@
use std::collections::{HashMap, VecDeque};
use std::ops::Add;
use tokio::time::{Duration, Instant};
use crate::thread_mgr::shutdown_watcher;
use tokio::sync::Mutex;
use tokio::sync::mpsc::{Receiver, Sender, channel};
use tokio::time::{sleep, sleep_until};
use super::worker::{Job, Worker, Report};
pub struct Spawner<J: Job> {
send_worker: Sender<Worker<J>>,
send_report: Sender<Report<J>>,
}
impl<J: Job> Spawner<J> {
pub fn spawn_worker(&self) {
use crate::{jobs::worker::run_worker, thread_mgr::{self, ThreadKind}};
let enlist = self.send_worker.clone();
let report = self.send_report.clone();
thread_mgr::spawn(
ThreadKind::GcWorker,
None,
None,
"gc_worker_1",
true,
move || {
run_worker(enlist, report)
},
).unwrap();
}
}
#[derive(Debug)]
pub enum Status {
Scheduled,
Running,
Stuck,
}
pub struct Scheduler<J: Job> {
send_work: Sender<J>,
recv_report: Receiver<Report<J>>,
pub period: Duration,
pub chores: Mutex<HashMap<J, Status>>,
}
impl<J: Job> Scheduler<J> {
pub async fn handle_report(&mut self, report: Report<J>) {
let job = report.for_job;
match report.result {
Ok(()) => {
// Reschedule job to run again
if let Some(status) = self.chores.lock().await.get_mut(&job) {
*status = Status::Scheduled;
}
},
Err(e) => {
// Remember error that got job stuck
println!("task panicked");
if let Some(status) = self.chores.lock().await.get_mut(&job) {
*status = Status::Stuck;
}
}
}
}
pub fn run(mut self) -> anyhow::Result<()> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
runtime.block_on(async {
let mut next_iteration = Instant::now();
loop {
tokio::select! {
_ = shutdown_watcher() => break,
_ = sleep_until(next_iteration) => {
next_iteration = Instant::now().add(self.period);
for (job, status) in self.chores.lock().await.iter_mut() {
if matches!(status, Status::Scheduled) {
self.send_work.send(job.clone()).await.unwrap();
*status = Status::Running;
}
}
}
report = self.recv_report.recv() => {
let report = report.expect("report channel closed");
self.handle_report(report).await;
}
}
}
});
Ok(())
}
}
#[derive(Debug)]
pub struct Board<J: Job> {
workers: Vec<Worker<J>>,
jobs: VecDeque<J>,
recv_work: Receiver<J>,
recv_worker: Receiver<Worker<J>>,
}
impl<J: Job> Board<J> {
pub fn new() -> (Board<J>, Spawner<J>, Scheduler<J>) {
let worker = channel::<Worker<J>>(100);
let work = channel::<J>(100);
let report = channel::<Report<J>>(100);
let board = Board {
workers: vec![],
jobs: VecDeque::new(),
recv_worker: worker.1,
recv_work: work.1,
};
let spawner = Spawner {
send_worker: worker.0,
send_report: report.0,
};
let scheduler = Scheduler {
send_work: work.0,
recv_report: report.1,
period: Duration::from_millis(10),
chores: Mutex::new(HashMap::new()),
};
(board, spawner, scheduler)
}
pub async fn handle_job(&mut self, job: J) {
// Assign to a worker if any are availabe
while let Some(w) = self.workers.pop() {
if let Ok(()) = w.0.send(job.clone()).await {
return;
}
}
self.jobs.push_back(job);
}
pub async fn handle_worker(&mut self, worker: Worker<J>) {
// Assign jobs if any are queued
if let Some(j) = self.jobs.pop_front() {
worker.0.send(j).await.ok();
} else {
self.workers.push(worker);
}
}
pub fn run(mut self) -> anyhow::Result<()> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
runtime.block_on(async {
loop {
tokio::select! {
_ = shutdown_watcher() => break,
worker = self.recv_worker.recv() => {
let worker = worker.expect("worker channel closed");
self.handle_worker(worker).await;
},
job = self.recv_work.recv() => {
let job = job.expect("job channel closed");
self.handle_job(job).await;
},
}
}
});
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::thread_mgr::{self, ThreadKind};
use super::*;
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
struct PrintJob {
to_print: String
}
impl Job for PrintJob {
fn run(&self) {
println!("{}", self.to_print);
}
}
#[tokio::test]
async fn sched_1() {
let (board, spawner, scheduler) = Board::<PrintJob>::new();
// Schedule recurring job
let j = PrintJob {
to_print: "hello from job".to_string(),
};
scheduler.chores.lock().await.insert(j, Status::Scheduled);
// Spawn board
thread_mgr::spawn(
ThreadKind::GcScheduler,
None,
None,
"gc_scheduler",
true,
move || {
board.run()
},
).unwrap();
// Spawn scheduler
thread_mgr::spawn(
ThreadKind::GcScheduler,
None,
None,
"gc_scheduler",
true,
move || {
scheduler.run()
},
).unwrap();
// Spawn worker
spawner.spawn_worker();
// Wait for job to run a few times
sleep(Duration::from_millis(100)).await;
// Cleanup
thread_mgr::shutdown_threads(None, None, None);
}
}

View File

@@ -0,0 +1,108 @@
//!
//! Worker thread that can be used in a thread pool to process jobs.
//!
use crate::thread_mgr::shutdown_watcher;
use tokio::sync::mpsc::{Sender, channel};
use std::any::Any;
use std::hash::Hash;
use std::panic::AssertUnwindSafe;
use std::panic::catch_unwind;
pub trait Job: std::fmt::Debug + Send + 'static + Clone + PartialEq + Eq + Hash {
fn run(&self);
}
#[derive(Debug)]
pub struct Worker<J: Job>(pub Sender<J>);
#[derive(Debug)]
pub struct Report<J: Job> {
pub for_job: J,
pub result: Result<(), Box<dyn Any + Send>>
}
pub fn run_worker<J: Job>(enlist: Sender<Worker<J>>, report: Sender<Report<J>>) -> anyhow::Result<()> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
runtime.block_on(async {
loop {
let (send_work, mut get_work) = channel::<J>(100);
enlist.send(Worker(send_work)).await.unwrap();
let shutdown_watcher = shutdown_watcher();
tokio::select! {
_ = shutdown_watcher => break,
j = get_work.recv() => {
if let Some(job) = j {
let result = catch_unwind(AssertUnwindSafe(|| {
job.run();
}));
report.send(Report {
for_job: job,
result: result,
}).await.unwrap();
} else {
// channel closed
return;
}
}
};
}
});
Ok(())
}
#[cfg(test)]
mod tests {
use crate::thread_mgr::{self, ThreadKind};
use super::*;
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
struct PrintJob {
to_print: String
}
impl Job for PrintJob {
fn run(&self) {
println!("{}", self.to_print);
}
}
#[tokio::test]
async fn worker_1() {
let mut worker = channel::<Worker<PrintJob>>(100);
let mut result = channel::<Report<PrintJob>>(100);
thread_mgr::spawn(
ThreadKind::GcWorker,
None,
None,
"gc_worker_1",
true,
move || {
run_worker(worker.0, result.0)
},
).unwrap();
let j = PrintJob {
to_print: "hello from job".to_string(),
};
let w = worker.1.recv().await.unwrap();
w.0.send(j.clone()).await.unwrap();
println!("waiting for result");
let report = result.1.recv().await.unwrap();
assert_eq!(j, report.for_job);
println!("got result");
thread_mgr::shutdown_threads(None, None, None);
}
#[test]
fn worker_cancellation() {
}
}

View File

@@ -13,7 +13,6 @@ pub mod repository;
pub mod storage_sync;
pub mod tenant_config;
pub mod tenant_mgr;
pub mod tenant_threads;
pub mod thread_mgr;
pub mod timelines;
pub mod virtual_file;
@@ -21,6 +20,7 @@ pub mod walingest;
pub mod walreceiver;
pub mod walrecord;
pub mod walredo;
pub mod jobs;
use lazy_static::lazy_static;
use tracing::info;

View File

@@ -2,6 +2,9 @@
//! page server.
use crate::config::PageServerConf;
use crate::jobs::chore::{Chore, ChoreHandle, Schedule, Scheduler};
use crate::jobs::compaction::{COMPACTION_SCHEDULER, CompactionJob};
use crate::jobs::gc::{GC_SCHEDULER, GcJob};
use crate::layered_repository::LayeredRepository;
use crate::pgdatadir_mapping::DatadirTimeline;
use crate::repository::{Repository, TimelineSyncStatusUpdate};
@@ -21,6 +24,7 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use tracing::*;
use utils::zid::{ZTenantId, ZTimelineId};
@@ -71,7 +75,7 @@ pub enum TenantState {
//Ready,
// This tenant exists on local disk, and the layer map has been loaded into memory.
// The local disk might have some newer files that don't exist in cloud storage yet.
Active,
Active(ChoreHandle<GcJob>, ChoreHandle<CompactionJob>),
// Tenant is active, but there is no walreceiver connection.
Idle,
// This tenant exists on local disk, and the layer map has been loaded into memory.
@@ -83,7 +87,7 @@ pub enum TenantState {
impl fmt::Display for TenantState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TenantState::Active => f.write_str("Active"),
TenantState::Active(_, _) => f.write_str("Active"),
TenantState::Idle => f.write_str("Idle"),
TenantState::Stopping => f.write_str("Stopping"),
}
@@ -236,35 +240,26 @@ pub fn activate_tenant(tenant_id: ZTenantId) -> anyhow::Result<()> {
match tenant.state {
// If the tenant is already active, nothing to do.
TenantState::Active => {}
TenantState::Active(_, _) => {}
// If it's Idle, launch the compactor and GC threads
TenantState::Idle => {
thread_mgr::spawn(
ThreadKind::Compactor,
Some(tenant_id),
None,
"Compactor thread",
false,
move || crate::tenant_threads::compact_loop(tenant_id),
)?;
let repo = crate::tenant_mgr::get_repository_for_tenant(tenant_id)?;
let gc_spawn_result = thread_mgr::spawn(
ThreadKind::GarbageCollector,
Some(tenant_id),
None,
"GC thread",
false,
move || crate::tenant_threads::gc_loop(tenant_id),
)
.with_context(|| format!("Failed to launch GC thread for tenant {tenant_id}"));
let compaction_chore_handle = COMPACTION_SCHEDULER.get().unwrap().add_chore(Chore {
job: CompactionJob {
tenant: tenant_id,
},
schedule: Schedule::Every(repo.get_compaction_period()),
});
if let Err(e) = &gc_spawn_result {
error!("Failed to start GC thread for tenant {tenant_id}, stopping its checkpointer thread: {e:?}");
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
return gc_spawn_result;
}
tenant.state = TenantState::Active;
let gc_chore_handle = GC_SCHEDULER.get().unwrap().add_chore(Chore {
job: GcJob {
tenant: tenant_id,
},
schedule: Schedule::Every(repo.get_gc_period()),
});
tenant.state = TenantState::Active(gc_chore_handle, compaction_chore_handle);
}
TenantState::Stopping => {

View File

@@ -1,79 +0,0 @@
//! This module contains functions to serve per-tenant background processes,
//! such as compaction and GC
use crate::repository::Repository;
use crate::tenant_mgr;
use crate::tenant_mgr::TenantState;
use anyhow::Result;
use std::time::Duration;
use tracing::*;
use utils::zid::ZTenantId;
///
/// Compaction thread's main loop
///
pub fn compact_loop(tenantid: ZTenantId) -> Result<()> {
if let Err(err) = compact_loop_ext(tenantid) {
error!("compact loop terminated with error: {:?}", err);
Err(err)
} else {
Ok(())
}
}
fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> {
loop {
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
break;
}
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let compaction_period = repo.get_compaction_period();
std::thread::sleep(compaction_period);
trace!("compaction thread for tenant {} waking up", tenantid);
// Compact timelines
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
repo.compaction_iteration()?;
}
trace!(
"compaction thread stopped for tenant {} state is {:?}",
tenantid,
tenant_mgr::get_tenant_state(tenantid)
);
Ok(())
}
///
/// GC thread's main loop
///
pub fn gc_loop(tenantid: ZTenantId) -> Result<()> {
loop {
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
break;
}
trace!("gc thread for tenant {} waking up", tenantid);
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let gc_horizon = repo.get_gc_horizon();
// Garbage collect old files that are not needed for PITR anymore
if gc_horizon > 0 {
repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?;
}
// TODO Write it in more adequate way using
// condvar.wait_timeout() or something
let mut sleep_time = repo.get_gc_period().as_secs();
while sleep_time > 0 && tenant_mgr::get_tenant_state(tenantid) == Some(TenantState::Active)
{
sleep_time -= 1;
std::thread::sleep(Duration::from_secs(1));
}
}
trace!(
"GC thread stopped for tenant {} state is {:?}",
tenantid,
tenant_mgr::get_tenant_state(tenantid)
);
Ok(())
}

View File

@@ -97,6 +97,12 @@ pub enum ThreadKind {
// Thread that handles compaction of all timelines for a tenant.
Compactor,
// Thread that schedules GC tasks
GcScheduler,
// Thread that works on GC tasks
GcWorker,
// Thread that handles GC of a tenant
GarbageCollector,