make it possible to associate thread with a tenant after thread start

This commit is contained in:
Dmitry Rodionov
2022-06-06 23:57:06 +03:00
committed by Dmitry Rodionov
parent 6cfebc096f
commit 7dc6beacbd
2 changed files with 50 additions and 31 deletions

View File

@@ -370,6 +370,10 @@ impl PageServerHandler {
) -> anyhow::Result<()> {
let _enter = info_span!("pagestream", timeline = %timelineid, tenant = %tenantid).entered();
// NOTE: pagerequests handler exits when connection is closed,
// so there is no need to reset the association
thread_mgr::associate_with(Some(tenantid), Some(timelineid));
// Check that the timeline exists
let timeline = tenant_mgr::get_local_timeline_with_load(tenantid, timelineid)
.context("Cannot load local timeline")?;
@@ -802,7 +806,6 @@ impl postgres_backend::Handler for PageServerHandler {
.map(|h| h.as_str().parse())
.unwrap_or_else(|| Ok(repo.get_gc_horizon()))?;
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
// Use tenant's pitr setting
let pitr = repo.get_pitr_interval();
let result = repo.gc_iteration(Some(timelineid), gc_horizon, pitr, true)?;

View File

@@ -108,15 +108,21 @@ pub enum ThreadKind {
StorageSync,
}
struct MutableThreadState {
/// Tenant and timeline that this thread is associated with.
tenant_id: Option<ZTenantId>,
timeline_id: Option<ZTimelineId>,
/// Handle for waiting for the thread to exit. It can be None, if the
/// the thread has already exited.
join_handle: Option<JoinHandle<()>>,
}
struct PageServerThread {
_thread_id: u64,
kind: ThreadKind,
/// Tenant and timeline that this thread is associated with.
tenant_id: Option<ZTenantId>,
timeline_id: Option<ZTimelineId>,
name: String,
// To request thread shutdown, set the flag, and send a dummy message to the
@@ -124,9 +130,7 @@ struct PageServerThread {
shutdown_requested: AtomicBool,
shutdown_tx: watch::Sender<()>,
/// Handle for waiting for the thread to exit. It can be None, if the
/// the thread has already exited.
join_handle: Mutex<Option<JoinHandle<()>>>,
mutable: Mutex<MutableThreadState>,
}
/// Launch a new thread
@@ -145,29 +149,27 @@ where
{
let (shutdown_tx, shutdown_rx) = watch::channel(());
let thread_id = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed);
let thread = PageServerThread {
let thread = Arc::new(PageServerThread {
_thread_id: thread_id,
kind,
tenant_id,
timeline_id,
name: name.to_string(),
shutdown_requested: AtomicBool::new(false),
shutdown_tx,
join_handle: Mutex::new(None),
};
let thread_rc = Arc::new(thread);
let mut jh_guard = thread_rc.join_handle.lock().unwrap();
mutable: Mutex::new(MutableThreadState {
tenant_id,
timeline_id,
join_handle: None,
}),
});
THREADS
.lock()
.unwrap()
.insert(thread_id, Arc::clone(&thread_rc));
.insert(thread_id, Arc::clone(&thread));
let thread_rc2 = Arc::clone(&thread_rc);
let mut thread_mut = thread.mutable.lock().unwrap();
let thread_cloned = Arc::clone(&thread);
let thread_name = name.to_string();
let join_handle = match thread::Builder::new()
.name(name.to_string())
@@ -175,7 +177,7 @@ where
thread_wrapper(
thread_name,
thread_id,
thread_rc2,
thread_cloned,
shutdown_rx,
shutdown_process_on_error,
f,
@@ -189,8 +191,8 @@ where
return Err(err);
}
};
*jh_guard = Some(join_handle);
drop(jh_guard);
thread_mut.join_handle = Some(join_handle);
drop(thread_mut);
// The thread is now running. Nothing more to do here
Ok(thread_id)
@@ -229,19 +231,20 @@ fn thread_wrapper<F>(
.remove(&thread_id)
.expect("no thread in registry");
let thread_mut = thread.mutable.lock().unwrap();
match result {
Ok(Ok(())) => debug!("Thread '{}' exited normally", thread_name),
Ok(Err(err)) => {
if shutdown_process_on_error {
error!(
"Shutting down: thread '{}' tenant_id: {:?}, timeline_id: {:?} exited with error: {:?}",
thread_name, thread.tenant_id, thread.timeline_id, err
thread_name, thread_mut.tenant_id, thread_mut.timeline_id, err
);
shutdown_pageserver(1);
} else {
error!(
"Thread '{}' tenant_id: {:?}, timeline_id: {:?} exited with error: {:?}",
thread_name, thread.tenant_id, thread.timeline_id, err
thread_name, thread_mut.tenant_id, thread_mut.timeline_id, err
);
}
}
@@ -249,19 +252,29 @@ fn thread_wrapper<F>(
if shutdown_process_on_error {
error!(
"Shutting down: thread '{}' tenant_id: {:?}, timeline_id: {:?} panicked: {:?}",
thread_name, thread.tenant_id, thread.timeline_id, err
thread_name, thread_mut.tenant_id, thread_mut.timeline_id, err
);
shutdown_pageserver(1);
} else {
error!(
"Thread '{}' tenant_id: {:?}, timeline_id: {:?} panicked: {:?}",
thread_name, thread.tenant_id, thread.timeline_id, err
thread_name, thread_mut.tenant_id, thread_mut.timeline_id, err
);
}
}
}
}
// expected to be called from the thread of the given id.
pub fn associate_with(tenant_id: Option<ZTenantId>, timeline_id: Option<ZTimelineId>) {
CURRENT_THREAD.with(|ct| {
let borrowed = ct.borrow();
let mut thread_mut = borrowed.as_ref().unwrap().mutable.lock().unwrap();
thread_mut.tenant_id = tenant_id;
thread_mut.timeline_id = timeline_id;
});
}
/// Is there a thread running that matches the criteria
/// Signal and wait for threads to shut down.
@@ -285,9 +298,10 @@ pub fn shutdown_threads(
let threads = THREADS.lock().unwrap();
for thread in threads.values() {
let thread_mut = thread.mutable.lock().unwrap();
if (kind.is_none() || Some(thread.kind) == kind)
&& (tenant_id.is_none() || thread.tenant_id == tenant_id)
&& (timeline_id.is_none() || thread.timeline_id == timeline_id)
&& (tenant_id.is_none() || thread_mut.tenant_id == tenant_id)
&& (timeline_id.is_none() || thread_mut.timeline_id == timeline_id)
{
thread.shutdown_requested.store(true, Ordering::Relaxed);
// FIXME: handle error?
@@ -298,8 +312,10 @@ pub fn shutdown_threads(
drop(threads);
for thread in victim_threads {
let mut thread_mut = thread.mutable.lock().unwrap();
info!("waiting for {} to shut down", thread.name);
if let Some(join_handle) = thread.join_handle.lock().unwrap().take() {
if let Some(join_handle) = thread_mut.join_handle.take() {
drop(thread_mut);
let _ = join_handle.join();
} else {
// The thread had not even fully started yet. Or it was shut down