mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-18 02:42:56 +00:00
Compare commits
24 Commits
RemoteExte
...
ps-thread-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8c78a5e9a6 | ||
|
|
a8f0124af7 | ||
|
|
88154f9871 | ||
|
|
14b8dcab5b | ||
|
|
aee8615b30 | ||
|
|
079871f8e3 | ||
|
|
f9c1fc8657 | ||
|
|
32b4e0fda2 | ||
|
|
6b4e17ef89 | ||
|
|
07e6dd809d | ||
|
|
b028f12b06 | ||
|
|
3975417cae | ||
|
|
98d8d65c83 | ||
|
|
1e9b628c25 | ||
|
|
31dc9e6abd | ||
|
|
0d5fee3ab7 | ||
|
|
dc47f9ccf1 | ||
|
|
fefbff8981 | ||
|
|
eefc7aa792 | ||
|
|
f3c71899be | ||
|
|
de55b2f139 | ||
|
|
5a19ac02c9 | ||
|
|
13b374a2df | ||
|
|
b9814222f9 |
@@ -11,7 +11,13 @@ use daemonize::Daemonize;
|
||||
use fail::FailScenario;
|
||||
use pageserver::{
|
||||
config::{defaults::*, PageServerConf},
|
||||
http, page_cache, page_service, profiling, tenant_mgr, thread_mgr,
|
||||
http, page_cache, page_service, profiling,
|
||||
tenant_jobs::{
|
||||
compaction::{CompactionJob, COMPACTION_POOL},
|
||||
gc::{GcJob, GC_POOL},
|
||||
worker::Pool,
|
||||
},
|
||||
tenant_mgr, thread_mgr,
|
||||
thread_mgr::ThreadKind,
|
||||
timelines, virtual_file, LOG_FILE_NAME,
|
||||
};
|
||||
@@ -302,6 +308,36 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
|
||||
move || page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type),
|
||||
)?;
|
||||
|
||||
// Spawn GC workers
|
||||
GC_POOL.set(Pool::<GcJob>::new()).unwrap();
|
||||
for i in 0..3 {
|
||||
let name = format!("gc_worker_{}", i);
|
||||
thread_mgr::spawn(
|
||||
ThreadKind::GarbageCollectionWorker,
|
||||
None,
|
||||
None,
|
||||
&name.clone(),
|
||||
true,
|
||||
move || GC_POOL.get().unwrap().worker_main(name),
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// Spawn compaction workers
|
||||
COMPACTION_POOL.set(Pool::<CompactionJob>::new()).unwrap();
|
||||
for i in 0..3 {
|
||||
let name = format!("compaction_worker_{}", i);
|
||||
thread_mgr::spawn(
|
||||
ThreadKind::CompactionWorker,
|
||||
None,
|
||||
None,
|
||||
&name.clone(),
|
||||
true,
|
||||
move || GC_POOL.get().unwrap().worker_main(name),
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
signals.handle(|signal| match signal {
|
||||
Signal::Quit => {
|
||||
info!(
|
||||
|
||||
@@ -12,8 +12,8 @@ pub mod reltag;
|
||||
pub mod repository;
|
||||
pub mod storage_sync;
|
||||
pub mod tenant_config;
|
||||
pub mod tenant_jobs;
|
||||
pub mod tenant_mgr;
|
||||
pub mod tenant_threads;
|
||||
pub mod thread_mgr;
|
||||
pub mod timelines;
|
||||
pub mod virtual_file;
|
||||
|
||||
35
pageserver/src/tenant_jobs/compaction.rs
Normal file
35
pageserver/src/tenant_jobs/compaction.rs
Normal file
@@ -0,0 +1,35 @@
|
||||
use std::{ops::Add, time::Instant};
|
||||
|
||||
use crate::repository::Repository;
|
||||
use once_cell::sync::OnceCell;
|
||||
use utils::zid::ZTenantId;
|
||||
|
||||
use crate::tenant_mgr::{self, TenantState};
|
||||
|
||||
use super::worker::{Job, Pool};
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub struct CompactionJob {
|
||||
pub tenant: ZTenantId,
|
||||
}
|
||||
|
||||
impl Job for CompactionJob {
|
||||
type ErrorType = anyhow::Error;
|
||||
|
||||
fn run(&self) -> Result<Option<Instant>, Self::ErrorType> {
|
||||
// Don't reschedule job if tenant isn't active
|
||||
if !matches!(
|
||||
tenant_mgr::get_tenant_state(self.tenant),
|
||||
Some(TenantState::Active)
|
||||
) {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let repo = tenant_mgr::get_repository_for_tenant(self.tenant)?;
|
||||
repo.compaction_iteration()?;
|
||||
|
||||
Ok(Some(Instant::now().add(repo.get_compaction_period())))
|
||||
}
|
||||
}
|
||||
|
||||
pub static COMPACTION_POOL: OnceCell<Pool<CompactionJob>> = OnceCell::new();
|
||||
39
pageserver/src/tenant_jobs/gc.rs
Normal file
39
pageserver/src/tenant_jobs/gc.rs
Normal file
@@ -0,0 +1,39 @@
|
||||
use std::{ops::Add, time::Instant};
|
||||
|
||||
use crate::{
|
||||
repository::Repository,
|
||||
tenant_mgr::{self, TenantState},
|
||||
};
|
||||
use once_cell::sync::OnceCell;
|
||||
use utils::zid::ZTenantId;
|
||||
|
||||
use super::worker::{Job, Pool};
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub struct GcJob {
|
||||
pub tenant: ZTenantId,
|
||||
}
|
||||
|
||||
impl Job for GcJob {
|
||||
type ErrorType = anyhow::Error;
|
||||
|
||||
fn run(&self) -> Result<Option<Instant>, Self::ErrorType> {
|
||||
// Don't reschedule job if tenant isn't active
|
||||
if !matches!(
|
||||
tenant_mgr::get_tenant_state(self.tenant),
|
||||
Some(TenantState::Active)
|
||||
) {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let repo = tenant_mgr::get_repository_for_tenant(self.tenant)?;
|
||||
let gc_horizon = repo.get_gc_horizon();
|
||||
if gc_horizon > 0 {
|
||||
repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?;
|
||||
}
|
||||
|
||||
Ok(Some(Instant::now().add(repo.get_gc_period())))
|
||||
}
|
||||
}
|
||||
|
||||
pub static GC_POOL: OnceCell<Pool<GcJob>> = OnceCell::new();
|
||||
3
pageserver/src/tenant_jobs/mod.rs
Normal file
3
pageserver/src/tenant_jobs/mod.rs
Normal file
@@ -0,0 +1,3 @@
|
||||
pub mod compaction;
|
||||
pub mod gc;
|
||||
pub mod worker;
|
||||
294
pageserver/src/tenant_jobs/worker.rs
Normal file
294
pageserver/src/tenant_jobs/worker.rs
Normal file
@@ -0,0 +1,294 @@
|
||||
use lazy_static::lazy_static;
|
||||
use metrics::{register_gauge_vec, GaugeVec};
|
||||
use std::{
|
||||
any::Any,
|
||||
collections::{BinaryHeap, HashMap},
|
||||
fmt::Debug,
|
||||
hash::Hash,
|
||||
panic::{self, AssertUnwindSafe},
|
||||
sync::{Arc, Condvar, Mutex},
|
||||
time::Instant,
|
||||
};
|
||||
|
||||
use crate::thread_mgr::{get_shutdown_aware_condvar, is_shutdown_requested};
|
||||
|
||||
lazy_static! {
|
||||
static ref POOL_UTILIZATION_GAUGE: GaugeVec = register_gauge_vec!(
|
||||
"pageserver_pool_utilization",
|
||||
"Number of bysy workers",
|
||||
&["pool_name"]
|
||||
)
|
||||
.expect("Failed to register safekeeper_commit_lsn gauge vec");
|
||||
}
|
||||
|
||||
pub trait Job: std::fmt::Debug + Send + Clone + PartialOrd + Ord + Hash + 'static {
|
||||
type ErrorType;
|
||||
fn run(&self) -> Result<Option<Instant>, Self::ErrorType>;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum JobError<J: Job> {
|
||||
Panic(Box<dyn Any + Send>),
|
||||
Error(J::ErrorType),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum JobStatus<J: Job>
|
||||
where
|
||||
J::ErrorType: Debug,
|
||||
{
|
||||
Ready {
|
||||
#[allow(dead_code)]
|
||||
scheduled_for: Instant,
|
||||
},
|
||||
Running {
|
||||
#[allow(dead_code)]
|
||||
worker_name: String,
|
||||
|
||||
#[allow(dead_code)]
|
||||
started_at: Instant,
|
||||
},
|
||||
Stuck(JobError<J>),
|
||||
}
|
||||
|
||||
// TODO make this generic event, put in different module
|
||||
#[derive(Debug)]
|
||||
struct Deadline<J: Job>
|
||||
where
|
||||
J::ErrorType: Debug,
|
||||
{
|
||||
start_by: Instant,
|
||||
job: J,
|
||||
}
|
||||
|
||||
impl<J: Job> PartialOrd for Deadline<J>
|
||||
where
|
||||
J::ErrorType: Debug,
|
||||
{
|
||||
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
||||
other.start_by.partial_cmp(&self.start_by)
|
||||
}
|
||||
}
|
||||
|
||||
impl<J: Job> Ord for Deadline<J>
|
||||
where
|
||||
J::ErrorType: Debug,
|
||||
{
|
||||
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
||||
other.start_by.cmp(&self.start_by)
|
||||
}
|
||||
}
|
||||
|
||||
impl<J: Job> PartialEq for Deadline<J>
|
||||
where
|
||||
J::ErrorType: Debug,
|
||||
{
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.start_by == other.start_by
|
||||
}
|
||||
}
|
||||
|
||||
impl<J: Job> Eq for Deadline<J> where J::ErrorType: Debug {}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct JobStatusTable<J: Job>
|
||||
where
|
||||
J::ErrorType: Debug,
|
||||
{
|
||||
/// Complete summary of current state
|
||||
status: HashMap<J, JobStatus<J>>,
|
||||
|
||||
/// Index over status for finding the next scheduled job
|
||||
queue: BinaryHeap<Deadline<J>>,
|
||||
}
|
||||
|
||||
impl<J: Job> JobStatusTable<J>
|
||||
where
|
||||
J::ErrorType: Debug,
|
||||
{
|
||||
fn pop_due(&mut self) -> Option<Deadline<J>> {
|
||||
if let Some(deadline) = self.queue.peek() {
|
||||
if Instant::now() > deadline.start_by {
|
||||
return self.queue.pop();
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn set_status(&mut self, job: &J, status: JobStatus<J>) {
|
||||
let s = self.status.get_mut(job).expect("status not found");
|
||||
*s = status;
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Pool<J: Job>
|
||||
where
|
||||
J::ErrorType: Debug,
|
||||
{
|
||||
job_table: Mutex<JobStatusTable<J>>,
|
||||
condvar: Arc<Condvar>, // Notified when idle worker should wake up
|
||||
}
|
||||
|
||||
impl<J: Job> Default for Pool<J>
|
||||
where
|
||||
J::ErrorType: Debug,
|
||||
{
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl<J: Job> Pool<J>
|
||||
where
|
||||
J::ErrorType: Debug,
|
||||
{
|
||||
pub fn new() -> Self {
|
||||
Pool {
|
||||
job_table: Mutex::new(JobStatusTable::<J> {
|
||||
status: HashMap::<J, JobStatus<J>>::new(),
|
||||
queue: BinaryHeap::<Deadline<J>>::new(),
|
||||
}),
|
||||
condvar: get_shutdown_aware_condvar(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn worker_main(&self, worker_name: String) -> anyhow::Result<()> {
|
||||
let mut job_table = self.job_table.lock().unwrap();
|
||||
while !is_shutdown_requested() {
|
||||
if let Some(Deadline { job, .. }) = job_table.pop_due() {
|
||||
job_table.set_status(
|
||||
&job,
|
||||
JobStatus::Running {
|
||||
worker_name: worker_name.clone(),
|
||||
started_at: Instant::now(),
|
||||
},
|
||||
);
|
||||
|
||||
// Run job without holding lock
|
||||
drop(job_table);
|
||||
POOL_UTILIZATION_GAUGE
|
||||
.with_label_values(&["todo_put_pool_name_here"])
|
||||
.inc();
|
||||
let result = panic::catch_unwind(AssertUnwindSafe(|| job.run()));
|
||||
POOL_UTILIZATION_GAUGE
|
||||
.with_label_values(&["todo_put_pool_name_here"])
|
||||
.dec();
|
||||
job_table = self.job_table.lock().unwrap();
|
||||
|
||||
// Update job status
|
||||
match result {
|
||||
Ok(Ok(Some(reschedule_for))) => {
|
||||
job_table.set_status(
|
||||
&job,
|
||||
JobStatus::Ready {
|
||||
scheduled_for: reschedule_for,
|
||||
},
|
||||
);
|
||||
job_table.queue.push(Deadline {
|
||||
job: job.clone(),
|
||||
start_by: reschedule_for,
|
||||
});
|
||||
}
|
||||
Ok(Ok(None)) => {
|
||||
job_table.status.remove(&job);
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
job_table.set_status(&job, JobStatus::Stuck(JobError::Error(e)));
|
||||
println!("Job errored, thread is ok.");
|
||||
}
|
||||
Err(e) => {
|
||||
job_table.set_status(&job, JobStatus::Stuck(JobError::Panic(e)));
|
||||
println!("Job panicked, thread is ok.");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
match job_table.queue.peek() {
|
||||
Some(deadline) => {
|
||||
let wait_time = deadline.start_by.duration_since(Instant::now());
|
||||
job_table = self.condvar.wait_timeout(job_table, wait_time).unwrap().0;
|
||||
}
|
||||
None => {
|
||||
job_table = self.condvar.wait(job_table).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn queue_job(&self, job: J) {
|
||||
let mut job_table = self.job_table.lock().unwrap();
|
||||
let scheduled_for = Instant::now();
|
||||
job_table
|
||||
.status
|
||||
.insert(job.clone(), JobStatus::Ready { scheduled_for });
|
||||
job_table.queue.push(Deadline {
|
||||
job,
|
||||
start_by: scheduled_for,
|
||||
});
|
||||
|
||||
self.condvar.notify_all();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{ops::Add, time::Duration};
|
||||
|
||||
use once_cell::sync::OnceCell;
|
||||
|
||||
use super::*;
|
||||
use crate::thread_mgr::{self, ThreadKind};
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)]
|
||||
struct PrintJob {
|
||||
to_print: String,
|
||||
}
|
||||
|
||||
impl Job for PrintJob {
|
||||
type ErrorType = String;
|
||||
|
||||
fn run(&self) -> Result<Option<Instant>, String> {
|
||||
if self.to_print == "pls panic" {
|
||||
panic!("AAA");
|
||||
}
|
||||
println!("{}", self.to_print);
|
||||
Ok(Some(Instant::now().add(Duration::from_millis(10))))
|
||||
}
|
||||
}
|
||||
|
||||
static TEST_POOL: OnceCell<Pool<PrintJob>> = OnceCell::new();
|
||||
|
||||
#[tokio::test]
|
||||
async fn pool_1() {
|
||||
TEST_POOL.set(Pool::<PrintJob>::new()).unwrap();
|
||||
|
||||
thread_mgr::spawn(
|
||||
ThreadKind::GarbageCollectionWorker,
|
||||
None,
|
||||
None,
|
||||
"test_worker_1",
|
||||
true,
|
||||
move || TEST_POOL.get().unwrap().worker_main("test_worker_1".into()),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
thread_mgr::spawn(
|
||||
ThreadKind::GarbageCollectionWorker,
|
||||
None,
|
||||
None,
|
||||
"test_worker_2",
|
||||
true,
|
||||
move || TEST_POOL.get().unwrap().worker_main("test_worker_2".into()),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
TEST_POOL.get().unwrap().queue_job(PrintJob {
|
||||
to_print: "hello from job".to_string(),
|
||||
});
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
}
|
||||
@@ -8,6 +8,8 @@ use crate::repository::{Repository, TimelineSyncStatusUpdate};
|
||||
use crate::storage_sync::index::RemoteIndex;
|
||||
use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData};
|
||||
use crate::tenant_config::TenantConfOpt;
|
||||
use crate::tenant_jobs::compaction::{CompactionJob, COMPACTION_POOL};
|
||||
use crate::tenant_jobs::gc::{GcJob, GC_POOL};
|
||||
use crate::thread_mgr;
|
||||
use crate::thread_mgr::ThreadKind;
|
||||
use crate::timelines;
|
||||
@@ -173,8 +175,8 @@ pub fn shutdown_all_tenants() {
|
||||
drop(m);
|
||||
|
||||
thread_mgr::shutdown_threads(Some(ThreadKind::WalReceiver), None, None);
|
||||
thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), None, None);
|
||||
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), None, None);
|
||||
thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollectionWorker), None, None);
|
||||
thread_mgr::shutdown_threads(Some(ThreadKind::CompactionWorker), None, None);
|
||||
|
||||
// Ok, no background threads running anymore. Flush any remaining data in
|
||||
// memory to disk.
|
||||
@@ -262,34 +264,20 @@ pub fn activate_tenant(tenant_id: ZTenantId) -> anyhow::Result<()> {
|
||||
// If the tenant is already active, nothing to do.
|
||||
TenantState::Active => {}
|
||||
|
||||
// If it's Idle, launch the compactor and GC threads
|
||||
// If it's Idle, launch the compactor and GC jobs
|
||||
TenantState::Idle => {
|
||||
thread_mgr::spawn(
|
||||
ThreadKind::Compactor,
|
||||
Some(tenant_id),
|
||||
None,
|
||||
"Compactor thread",
|
||||
false,
|
||||
move || crate::tenant_threads::compact_loop(tenant_id),
|
||||
)?;
|
||||
|
||||
let gc_spawn_result = thread_mgr::spawn(
|
||||
ThreadKind::GarbageCollector,
|
||||
Some(tenant_id),
|
||||
None,
|
||||
"GC thread",
|
||||
false,
|
||||
move || crate::tenant_threads::gc_loop(tenant_id),
|
||||
)
|
||||
.map(|_thread_id| ()) // update the `Result::Ok` type to match the outer function's return signature
|
||||
.with_context(|| format!("Failed to launch GC thread for tenant {tenant_id}"));
|
||||
|
||||
if let Err(e) = &gc_spawn_result {
|
||||
error!("Failed to start GC thread for tenant {tenant_id}, stopping its checkpointer thread: {e:?}");
|
||||
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
|
||||
return gc_spawn_result;
|
||||
}
|
||||
// Important to activate before scheduling jobs
|
||||
tenant.state = TenantState::Active;
|
||||
|
||||
GC_POOL
|
||||
.get()
|
||||
.unwrap()
|
||||
.queue_job(GcJob { tenant: tenant_id });
|
||||
|
||||
COMPACTION_POOL
|
||||
.get()
|
||||
.unwrap()
|
||||
.queue_job(CompactionJob { tenant: tenant_id });
|
||||
}
|
||||
|
||||
TenantState::Stopping => {
|
||||
|
||||
@@ -1,79 +0,0 @@
|
||||
//! This module contains functions to serve per-tenant background processes,
|
||||
//! such as compaction and GC
|
||||
use crate::repository::Repository;
|
||||
use crate::tenant_mgr;
|
||||
use crate::tenant_mgr::TenantState;
|
||||
use anyhow::Result;
|
||||
use std::time::Duration;
|
||||
use tracing::*;
|
||||
use utils::zid::ZTenantId;
|
||||
|
||||
///
|
||||
/// Compaction thread's main loop
|
||||
///
|
||||
pub fn compact_loop(tenantid: ZTenantId) -> Result<()> {
|
||||
if let Err(err) = compact_loop_ext(tenantid) {
|
||||
error!("compact loop terminated with error: {:?}", err);
|
||||
Err(err)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> {
|
||||
loop {
|
||||
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
|
||||
break;
|
||||
}
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
let compaction_period = repo.get_compaction_period();
|
||||
|
||||
std::thread::sleep(compaction_period);
|
||||
trace!("compaction thread for tenant {} waking up", tenantid);
|
||||
|
||||
// Compact timelines
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
repo.compaction_iteration()?;
|
||||
}
|
||||
|
||||
trace!(
|
||||
"compaction thread stopped for tenant {} state is {:?}",
|
||||
tenantid,
|
||||
tenant_mgr::get_tenant_state(tenantid)
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// GC thread's main loop
|
||||
///
|
||||
pub fn gc_loop(tenantid: ZTenantId) -> Result<()> {
|
||||
loop {
|
||||
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
|
||||
break;
|
||||
}
|
||||
|
||||
trace!("gc thread for tenant {} waking up", tenantid);
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
let gc_horizon = repo.get_gc_horizon();
|
||||
// Garbage collect old files that are not needed for PITR anymore
|
||||
if gc_horizon > 0 {
|
||||
repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?;
|
||||
}
|
||||
|
||||
// TODO Write it in more adequate way using
|
||||
// condvar.wait_timeout() or something
|
||||
let mut sleep_time = repo.get_gc_period().as_secs();
|
||||
while sleep_time > 0 && tenant_mgr::get_tenant_state(tenantid) == Some(TenantState::Active)
|
||||
{
|
||||
sleep_time -= 1;
|
||||
std::thread::sleep(Duration::from_secs(1));
|
||||
}
|
||||
}
|
||||
trace!(
|
||||
"GC thread stopped for tenant {} state is {:?}",
|
||||
tenantid,
|
||||
tenant_mgr::get_tenant_state(tenantid)
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
@@ -37,7 +37,7 @@ use std::collections::HashMap;
|
||||
use std::panic;
|
||||
use std::panic::AssertUnwindSafe;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::{Arc, Condvar, Mutex};
|
||||
use std::thread;
|
||||
use std::thread::JoinHandle;
|
||||
|
||||
@@ -59,6 +59,17 @@ lazy_static! {
|
||||
|
||||
/// Global registry of threads
|
||||
static ref THREADS: Mutex<HashMap<u64, Arc<PageServerThread>>> = Mutex::new(HashMap::new());
|
||||
|
||||
// TODO make these per thread for targetted shutdown, also for cleanup.
|
||||
/// Condvars to notify after shutdown request
|
||||
static ref SHUTDOWN_CONDVARS: Mutex<Vec<Arc<Condvar>>> = Mutex::new(Vec::new());
|
||||
}
|
||||
|
||||
/// Return a condvar which will receive a notify_all() call when shutdown is requested
|
||||
pub fn get_shutdown_aware_condvar() -> Arc<Condvar> {
|
||||
let mut condvars = SHUTDOWN_CONDVARS.lock().unwrap();
|
||||
condvars.push(Arc::new(Condvar::new()));
|
||||
condvars.last().unwrap().clone()
|
||||
}
|
||||
|
||||
// There is a Tokio watch channel for each thread, which can be used to signal the
|
||||
@@ -94,11 +105,11 @@ pub enum ThreadKind {
|
||||
// Thread that connects to a safekeeper to fetch WAL for one timeline.
|
||||
WalReceiver,
|
||||
|
||||
// Thread that handles compaction of all timelines for a tenant.
|
||||
Compactor,
|
||||
// Worker that does compaction jobs
|
||||
CompactionWorker,
|
||||
|
||||
// Thread that handles GC of a tenant
|
||||
GarbageCollector,
|
||||
// Worker that does GC jobs
|
||||
GarbageCollectionWorker,
|
||||
|
||||
// Thread that flushes frozen in-memory layers to disk
|
||||
LayerFlushThread,
|
||||
@@ -297,6 +308,10 @@ pub fn shutdown_threads(
|
||||
}
|
||||
drop(threads);
|
||||
|
||||
for condvar in SHUTDOWN_CONDVARS.lock().unwrap().iter() {
|
||||
condvar.notify_all();
|
||||
}
|
||||
|
||||
for thread in victim_threads {
|
||||
info!("waiting for {} to shut down", thread.name);
|
||||
if let Some(join_handle) = thread.join_handle.lock().unwrap().take() {
|
||||
|
||||
Reference in New Issue
Block a user