Compare commits

...

12 Commits

Author SHA1 Message Date
Joonas Koivunen
d87549696b chore: clippy::too_many_arguments 2023-12-13 23:07:25 +00:00
Joonas Koivunen
960a29a6fe refactor: cleanup extra cancellation waits 2023-12-13 23:05:54 +00:00
Joonas Koivunen
d6f6e9a87b fix: layer backoff 2023-12-13 23:05:54 +00:00
Joonas Koivunen
ddae6e2b0a feat: task hierarchy 2023-12-13 23:05:48 +00:00
Joonas Koivunen
e021298dec use child_token instead of cloning 2023-12-13 23:05:35 +00:00
Joonas Koivunen
9790a7c2e8 test: allow shutdown_token when #[cfg(test)] 2023-12-13 22:56:31 +00:00
Joonas Koivunen
9660282c69 chore: cleanup unused 2023-12-13 22:41:06 +00:00
Joonas Koivunen
894cd3ddf7 refactor: eviction_task: stop using plain rate_limit 2023-12-13 22:41:06 +00:00
Joonas Koivunen
735c9b3b70 fix: gc lock acquire cancel 2023-12-13 22:33:32 +00:00
Joonas Koivunen
e76b24ccc5 fix: initial logical size permit cancel 2023-12-13 22:33:32 +00:00
Joonas Koivunen
6ff2c07cc8 fix: compaction lock and permit cancellable 2023-12-13 22:33:32 +00:00
Joonas Koivunen
efd46e478a refactor: split concurrent_background_tasks_rate_limit 2023-12-13 22:33:32 +00:00
19 changed files with 193 additions and 89 deletions

View File

@@ -370,13 +370,18 @@ fn start_pageserver(
// Top-level cancellation token for the process // Top-level cancellation token for the process
let shutdown_pageserver = tokio_util::sync::CancellationToken::new(); let shutdown_pageserver = tokio_util::sync::CancellationToken::new();
pageserver::PAGESERVER_SHUTDOWN_TOKEN
.set(shutdown_pageserver.clone())
.map_err(|_| ())
.expect("cannot be set already");
// Set up remote storage client // Set up remote storage client
let remote_storage = create_remote_storage_client(conf)?; let remote_storage = create_remote_storage_client(conf)?;
// Set up deletion queue // Set up deletion queue
let (deletion_queue, deletion_workers) = DeletionQueue::new( let (deletion_queue, deletion_workers) = DeletionQueue::new(
remote_storage.clone(), remote_storage.clone(),
ControlPlaneClient::new(conf, &shutdown_pageserver), ControlPlaneClient::new(conf, shutdown_pageserver.child_token()),
conf, conf,
); );
if let Some(deletion_workers) = deletion_workers { if let Some(deletion_workers) = deletion_workers {
@@ -420,12 +425,12 @@ fn start_pageserver(
deletion_queue_client, deletion_queue_client,
}, },
order, order,
shutdown_pageserver.clone(), shutdown_pageserver.child_token(),
))?; ))?;
let tenant_manager = Arc::new(tenant_manager); let tenant_manager = Arc::new(tenant_manager);
BACKGROUND_RUNTIME.spawn({ BACKGROUND_RUNTIME.spawn({
let shutdown_pageserver = shutdown_pageserver.clone(); let shutdown_pageserver = shutdown_pageserver.child_token();
let drive_init = async move { let drive_init = async move {
// NOTE: unlike many futures in pageserver, this one is cancellation-safe // NOTE: unlike many futures in pageserver, this one is cancellation-safe
let guard = scopeguard::guard_on_success((), |_| { let guard = scopeguard::guard_on_success((), |_| {
@@ -516,6 +521,7 @@ fn start_pageserver(
remote_storage.clone(), remote_storage.clone(),
disk_usage_eviction_state.clone(), disk_usage_eviction_state.clone(),
background_jobs_barrier.clone(), background_jobs_barrier.clone(),
shutdown_pageserver.child_token(),
)?; )?;
} }
@@ -536,13 +542,16 @@ fn start_pageserver(
) )
.context("Failed to initialize router state")?, .context("Failed to initialize router state")?,
); );
let cancel = shutdown_pageserver.child_token();
let router = http::make_router(router_state, launch_ts, http_auth.clone())? let router = http::make_router(router_state, launch_ts, http_auth.clone())?
.build() .build()
.map_err(|err| anyhow!(err))?; .map_err(|err| anyhow!(err))?;
let service = utils::http::RouterService::new(router).unwrap(); let service = utils::http::RouterService::new(router).unwrap();
let server = hyper::Server::from_tcp(http_listener)? let server = hyper::Server::from_tcp(http_listener)?
.serve(service) .serve(service)
.with_graceful_shutdown(task_mgr::shutdown_watcher()); .with_graceful_shutdown(cancel.clone().cancelled_owned());
task_mgr::spawn( task_mgr::spawn(
MGMT_REQUEST_RUNTIME.handle(), MGMT_REQUEST_RUNTIME.handle(),
@@ -551,6 +560,7 @@ fn start_pageserver(
None, None,
"http endpoint listener", "http endpoint listener",
true, true,
cancel,
async { async {
server.await?; server.await?;
Ok(()) Ok(())
@@ -576,6 +586,7 @@ fn start_pageserver(
None, None,
"consumption metrics collection", "consumption metrics collection",
true, true,
shutdown_pageserver.child_token(),
async move { async move {
// first wait until background jobs are cleared to launch. // first wait until background jobs are cleared to launch.
// //
@@ -624,6 +635,7 @@ fn start_pageserver(
None, None,
"libpq endpoint listener", "libpq endpoint listener",
true, true,
shutdown_pageserver.child_token(),
async move { async move {
page_service::libpq_listener_main( page_service::libpq_listener_main(
conf, conf,
@@ -657,9 +669,8 @@ fn start_pageserver(
signal.name() signal.name()
); );
// This cancels the `shutdown_pageserver` cancellation tree. // This cancels the `shutdown_pageserver` cancellation tree and signals cancellation to
// Right now that tree doesn't reach very far, and `task_mgr` is used instead. // all tasks in the system.
// The plan is to change that over time.
shutdown_pageserver.take(); shutdown_pageserver.take();
let bg_remote_storage = remote_storage.clone(); let bg_remote_storage = remote_storage.clone();
let bg_deletion_queue = deletion_queue.clone(); let bg_deletion_queue = deletion_queue.clone();

View File

@@ -65,6 +65,7 @@ pub async fn collect_metrics(
None, None,
"synthetic size calculation", "synthetic size calculation",
false, false,
cancel.child_token(),
async move { async move {
calculate_synthetic_size_worker( calculate_synthetic_size_worker(
synthetic_size_calculation_interval, synthetic_size_calculation_interval,

View File

@@ -40,7 +40,7 @@ pub trait ControlPlaneGenerationsApi {
impl ControlPlaneClient { impl ControlPlaneClient {
/// A None return value indicates that the input `conf` object does not have control /// A None return value indicates that the input `conf` object does not have control
/// plane API enabled. /// plane API enabled.
pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Option<Self> { pub fn new(conf: &'static PageServerConf, cancel: CancellationToken) -> Option<Self> {
let mut url = match conf.control_plane_api.as_ref() { let mut url = match conf.control_plane_api.as_ref() {
Some(u) => u.clone(), Some(u) => u.clone(),
None => return None, None => return None,
@@ -67,7 +67,7 @@ impl ControlPlaneClient {
http_client: client.build().expect("Failed to construct HTTP client"), http_client: client.build().expect("Failed to construct HTTP client"),
base_url: url, base_url: url,
node_id: conf.id, node_id: conf.id,
cancel: cancel.clone(), cancel,
}) })
} }

View File

@@ -87,6 +87,7 @@ pub fn launch_disk_usage_global_eviction_task(
storage: GenericRemoteStorage, storage: GenericRemoteStorage,
state: Arc<State>, state: Arc<State>,
background_jobs_barrier: completion::Barrier, background_jobs_barrier: completion::Barrier,
cancel: CancellationToken,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let Some(task_config) = &conf.disk_usage_based_eviction else { let Some(task_config) = &conf.disk_usage_based_eviction else {
info!("disk usage based eviction task not configured"); info!("disk usage based eviction task not configured");
@@ -102,6 +103,7 @@ pub fn launch_disk_usage_global_eviction_task(
None, None,
"disk usage based eviction", "disk usage based eviction",
false, false,
cancel,
async move { async move {
let cancel = task_mgr::shutdown_token(); let cancel = task_mgr::shutdown_token();

View File

@@ -49,11 +49,22 @@ pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]); static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
/// The main cancellation token for the process.
///
/// Should only ever be used to create child tokens.
pub static PAGESERVER_SHUTDOWN_TOKEN: std::sync::OnceLock<tokio_util::sync::CancellationToken> =
std::sync::OnceLock::new();
pub use crate::metrics::preinitialize_metrics; pub use crate::metrics::preinitialize_metrics;
#[tracing::instrument(skip_all, fields(%exit_code))] #[tracing::instrument(skip_all, fields(%exit_code))]
pub async fn shutdown_pageserver(deletion_queue: Option<DeletionQueue>, exit_code: i32) { pub async fn shutdown_pageserver(deletion_queue: Option<DeletionQueue>, exit_code: i32) {
use std::time::Duration; use std::time::Duration;
if let Some(token) = PAGESERVER_SHUTDOWN_TOKEN.get() {
token.cancel();
}
// Shut down the libpq endpoint task. This prevents new connections from // Shut down the libpq endpoint task. This prevents new connections from
// being accepted. // being accepted.
timed( timed(

View File

@@ -166,6 +166,7 @@ pub async fn libpq_listener_main(
None, None,
"serving compute connection task", "serving compute connection task",
false, false,
cancel.child_token(),
page_service_conn_main( page_service_conn_main(
conf, conf,
broker_client.clone(), broker_client.clone(),

View File

@@ -327,6 +327,7 @@ struct PageServerTask {
/// Launch a new task /// Launch a new task
/// Note: if shutdown_process_on_error is set to true failure /// Note: if shutdown_process_on_error is set to true failure
/// of the task will lead to shutdown of entire process /// of the task will lead to shutdown of entire process
#[allow(clippy::too_many_arguments)]
pub fn spawn<F>( pub fn spawn<F>(
runtime: &tokio::runtime::Handle, runtime: &tokio::runtime::Handle,
kind: TaskKind, kind: TaskKind,
@@ -334,12 +335,13 @@ pub fn spawn<F>(
timeline_id: Option<TimelineId>, timeline_id: Option<TimelineId>,
name: &str, name: &str,
shutdown_process_on_error: bool, shutdown_process_on_error: bool,
cancel: CancellationToken,
future: F, future: F,
) -> PageserverTaskId ) -> PageserverTaskId
where where
F: Future<Output = anyhow::Result<()>> + Send + 'static, F: Future<Output = anyhow::Result<()>> + Send + 'static,
{ {
let cancel = CancellationToken::new(); // let cancel = CancellationToken::new();
let task_id = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed); let task_id = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed);
let task = Arc::new(PageServerTask { let task = Arc::new(PageServerTask {
task_id: PageserverTaskId(task_id), task_id: PageserverTaskId(task_id),
@@ -558,9 +560,14 @@ pub async fn shutdown_watcher() {
/// cancelled. It can however be moved to other tasks, such as `tokio::task::spawn_blocking` or /// cancelled. It can however be moved to other tasks, such as `tokio::task::spawn_blocking` or
/// `tokio::task::JoinSet::spawn`. /// `tokio::task::JoinSet::spawn`.
pub fn shutdown_token() -> CancellationToken { pub fn shutdown_token() -> CancellationToken {
SHUTDOWN_TOKEN let res = SHUTDOWN_TOKEN.try_with(|t| t.clone());
.try_with(|t| t.clone())
.expect("shutdown_token() called in an unexpected task or thread") if cfg!(test) {
res.unwrap_or_default()
} else {
// tests need to call the same paths which need to use get the shutdown token
res.expect("shutdown_token() called in an unexpected task or thread")
}
} }
/// Has the current task been requested to shut down? /// Has the current task been requested to shut down?

View File

@@ -486,6 +486,7 @@ impl Tenant {
ancestor.clone(), ancestor.clone(),
resources, resources,
CreateTimelineCause::Load, CreateTimelineCause::Load,
self.cancel.child_token(),
)?; )?;
let disk_consistent_lsn = timeline.get_disk_consistent_lsn(); let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
anyhow::ensure!( anyhow::ensure!(
@@ -503,7 +504,7 @@ impl Tenant {
.remote_client .remote_client
.as_ref() .as_ref()
.unwrap() .unwrap()
.init_upload_queue(index_part)?; .init_upload_queue(index_part, timeline.cancel.child_token())?;
} else if self.remote_storage.is_some() { } else if self.remote_storage.is_some() {
// No data on the remote storage, but we have local metadata file. We can end up // No data on the remote storage, but we have local metadata file. We can end up
// here with timeline_create being interrupted before finishing index part upload. // here with timeline_create being interrupted before finishing index part upload.
@@ -511,7 +512,7 @@ impl Tenant {
// If control plane retries timeline creation in the meantime, the mgmt API handler // If control plane retries timeline creation in the meantime, the mgmt API handler
// for timeline creation will coalesce on the upload we queue here. // for timeline creation will coalesce on the upload we queue here.
let rtc = timeline.remote_client.as_ref().unwrap(); let rtc = timeline.remote_client.as_ref().unwrap();
rtc.init_upload_queue_for_empty_remote(&metadata)?; rtc.init_upload_queue_for_empty_remote(&metadata, timeline.cancel.child_token())?;
rtc.schedule_index_upload_for_metadata_update(&metadata)?; rtc.schedule_index_upload_for_metadata_update(&metadata)?;
} }
@@ -605,6 +606,12 @@ impl Tenant {
let tenant_clone = Arc::clone(&tenant); let tenant_clone = Arc::clone(&tenant);
let ctx = ctx.detached_child(TaskKind::Attach, DownloadBehavior::Warn); let ctx = ctx.detached_child(TaskKind::Attach, DownloadBehavior::Warn);
let cancel = crate::PAGESERVER_SHUTDOWN_TOKEN
.get()
.cloned()
.unwrap_or_default()
.child_token();
task_mgr::spawn( task_mgr::spawn(
&tokio::runtime::Handle::current(), &tokio::runtime::Handle::current(),
TaskKind::Attach, TaskKind::Attach,
@@ -612,6 +619,7 @@ impl Tenant {
None, None,
"attach tenant", "attach tenant",
false, false,
cancel,
async move { async move {
// Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state. // Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state.
let make_broken = let make_broken =
@@ -871,8 +879,10 @@ impl Tenant {
// Walk through deleted timelines, resume deletion // Walk through deleted timelines, resume deletion
for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions { for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions {
let cancel = self.cancel.child_token();
remote_timeline_client remote_timeline_client
.init_upload_queue_stopped_to_continue_deletion(&index_part) .init_upload_queue_stopped_to_continue_deletion(&index_part, cancel.child_token())
.context("init queue stopped") .context("init queue stopped")
.map_err(LoadLocalTimelineError::ResumeDeletion)?; .map_err(LoadLocalTimelineError::ResumeDeletion)?;
@@ -882,6 +892,7 @@ impl Tenant {
&index_part.metadata, &index_part.metadata,
Some(remote_timeline_client), Some(remote_timeline_client),
self.deletion_queue_client.clone(), self.deletion_queue_client.clone(),
cancel,
) )
.await .await
.context("resume_deletion") .context("resume_deletion")
@@ -1215,7 +1226,7 @@ impl Tenant {
timeline_id, timeline_id,
self.generation, self.generation,
); );
let cancel_clone = cancel.clone(); let cancel_clone = cancel.child_token();
part_downloads.spawn( part_downloads.spawn(
async move { async move {
debug!("starting index part download"); debug!("starting index part download");
@@ -1376,6 +1387,7 @@ impl Tenant {
&local_metadata, &local_metadata,
None, None,
self.deletion_queue_client.clone(), self.deletion_queue_client.clone(),
self.cancel.child_token(),
) )
.await .await
.context("resume deletion") .context("resume deletion")
@@ -2290,6 +2302,7 @@ impl Tenant {
ancestor: Option<Arc<Timeline>>, ancestor: Option<Arc<Timeline>>,
resources: TimelineResources, resources: TimelineResources,
cause: CreateTimelineCause, cause: CreateTimelineCause,
cancel: CancellationToken,
) -> anyhow::Result<Arc<Timeline>> { ) -> anyhow::Result<Arc<Timeline>> {
let state = match cause { let state = match cause {
CreateTimelineCause::Load => { CreateTimelineCause::Load => {
@@ -2318,7 +2331,7 @@ impl Tenant {
resources, resources,
pg_version, pg_version,
state, state,
self.cancel.child_token(), cancel,
); );
Ok(timeline) Ok(timeline)
@@ -2391,6 +2404,12 @@ impl Tenant {
} }
}); });
let cancel = crate::PAGESERVER_SHUTDOWN_TOKEN
.get()
.cloned()
.unwrap_or_default()
.child_token();
Tenant { Tenant {
tenant_shard_id, tenant_shard_id,
shard_identity, shard_identity,
@@ -2410,7 +2429,7 @@ impl Tenant {
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)), cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()), eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())), delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
cancel: CancellationToken::default(), cancel,
gate: Gate::new(format!("Tenant<{tenant_shard_id}>")), gate: Gate::new(format!("Tenant<{tenant_shard_id}>")),
} }
} }
@@ -3154,8 +3173,11 @@ impl Tenant {
let tenant_shard_id = self.tenant_shard_id; let tenant_shard_id = self.tenant_shard_id;
let resources = self.build_timeline_resources(new_timeline_id); let resources = self.build_timeline_resources(new_timeline_id);
let cancel = self.cancel.child_token();
if let Some(remote_client) = &resources.remote_client { if let Some(remote_client) = &resources.remote_client {
remote_client.init_upload_queue_for_empty_remote(new_metadata)?; remote_client.init_upload_queue_for_empty_remote(new_metadata, cancel.child_token())?;
} }
let timeline_struct = self let timeline_struct = self
@@ -3165,6 +3187,7 @@ impl Tenant {
ancestor, ancestor,
resources, resources,
CreateTimelineCause::Load, CreateTimelineCause::Load,
cancel,
) )
.context("Failed to create timeline data structure")?; .context("Failed to create timeline data structure")?;

View File

@@ -460,6 +460,12 @@ impl DeleteTenantFlow {
) { ) {
let tenant_shard_id = tenant.tenant_shard_id; let tenant_shard_id = tenant.tenant_shard_id;
let cancel = crate::PAGESERVER_SHUTDOWN_TOKEN
.get()
.cloned()
.unwrap_or_default()
.child_token();
task_mgr::spawn( task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(), task_mgr::BACKGROUND_RUNTIME.handle(),
TaskKind::TimelineDeletionWorker, TaskKind::TimelineDeletionWorker,
@@ -467,6 +473,7 @@ impl DeleteTenantFlow {
None, None,
"tenant_delete", "tenant_delete",
false, false,
cancel,
async move { async move {
if let Err(err) = if let Err(err) =
Self::background(guard, conf, remote_storage, tenants, &tenant).await Self::background(guard, conf, remote_storage, tenants, &tenant).await

View File

@@ -283,7 +283,7 @@ async fn init_load_generations(
"Emergency mode! Tenants will be attached unsafely using their last known generation" "Emergency mode! Tenants will be attached unsafely using their last known generation"
); );
emergency_generations(tenant_confs) emergency_generations(tenant_confs)
} else if let Some(client) = ControlPlaneClient::new(conf, cancel) { } else if let Some(client) = ControlPlaneClient::new(conf, cancel.child_token()) {
info!("Calling control plane API to re-attach tenants"); info!("Calling control plane API to re-attach tenants");
// If we are configured to use the control plane API, then it is the source of truth for what tenants to load. // If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
match client.re_attach().await { match client.re_attach().await {
@@ -1352,6 +1352,11 @@ pub(crate) async fn detach_tenant(
// Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory. // Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
// After a tenant is detached, there are no more task_mgr tasks for that tenant_id. // After a tenant is detached, there are no more task_mgr tasks for that tenant_id.
let task_tenant_id = None; let task_tenant_id = None;
let cancel = crate::PAGESERVER_SHUTDOWN_TOKEN
.get()
.cloned()
.unwrap_or_default()
.child_token();
task_mgr::spawn( task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(), task_mgr::BACKGROUND_RUNTIME.handle(),
TaskKind::MgmtRequest, TaskKind::MgmtRequest,
@@ -1359,6 +1364,7 @@ pub(crate) async fn detach_tenant(
None, None,
"tenant_files_delete", "tenant_files_delete",
false, false,
cancel,
async move { async move {
fs::remove_dir_all(tmp_path.as_path()) fs::remove_dir_all(tmp_path.as_path())
.await .await
@@ -2086,6 +2092,7 @@ pub(crate) async fn immediate_gc(
Some(timeline_id), Some(timeline_id),
&format!("timeline_gc_handler garbage collection run for tenant {tenant_shard_id} timeline {timeline_id}"), &format!("timeline_gc_handler garbage collection run for tenant {tenant_shard_id} timeline {timeline_id}"),
false, false,
tenant.cancel.child_token(),
async move { async move {
fail::fail_point!("immediate_gc_task_pre"); fail::fail_point!("immediate_gc_task_pre");

View File

@@ -357,9 +357,13 @@ impl RemoteTimelineClient {
/// Initialize the upload queue for a remote storage that already received /// Initialize the upload queue for a remote storage that already received
/// an index file upload, i.e., it's not empty. /// an index file upload, i.e., it's not empty.
/// The given `index_part` must be the one on the remote. /// The given `index_part` must be the one on the remote.
pub fn init_upload_queue(&self, index_part: &IndexPart) -> anyhow::Result<()> { pub fn init_upload_queue(
&self,
index_part: &IndexPart,
cancel: CancellationToken,
) -> anyhow::Result<()> {
let mut upload_queue = self.upload_queue.lock().unwrap(); let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_with_current_remote_index_part(index_part)?; upload_queue.initialize_with_current_remote_index_part(index_part, cancel)?;
self.update_remote_physical_size_gauge(Some(index_part)); self.update_remote_physical_size_gauge(Some(index_part));
info!( info!(
"initialized upload queue from remote index with {} layer files", "initialized upload queue from remote index with {} layer files",
@@ -373,9 +377,10 @@ impl RemoteTimelineClient {
pub fn init_upload_queue_for_empty_remote( pub fn init_upload_queue_for_empty_remote(
&self, &self,
local_metadata: &TimelineMetadata, local_metadata: &TimelineMetadata,
cancel: CancellationToken,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut upload_queue = self.upload_queue.lock().unwrap(); let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_empty_remote(local_metadata)?; upload_queue.initialize_empty_remote(local_metadata, cancel)?;
self.update_remote_physical_size_gauge(None); self.update_remote_physical_size_gauge(None);
info!("initialized upload queue as empty"); info!("initialized upload queue as empty");
Ok(()) Ok(())
@@ -386,6 +391,7 @@ impl RemoteTimelineClient {
pub fn init_upload_queue_stopped_to_continue_deletion( pub fn init_upload_queue_stopped_to_continue_deletion(
&self, &self,
index_part: &IndexPart, index_part: &IndexPart,
cancel: CancellationToken,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// FIXME: consider newtype for DeletedIndexPart. // FIXME: consider newtype for DeletedIndexPart.
let deleted_at = index_part.deleted_at.ok_or(anyhow::anyhow!( let deleted_at = index_part.deleted_at.ok_or(anyhow::anyhow!(
@@ -394,7 +400,7 @@ impl RemoteTimelineClient {
{ {
let mut upload_queue = self.upload_queue.lock().unwrap(); let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_with_current_remote_index_part(index_part)?; upload_queue.initialize_with_current_remote_index_part(index_part, cancel)?;
self.update_remote_physical_size_gauge(Some(index_part)); self.update_remote_physical_size_gauge(Some(index_part));
} }
// also locks upload queue, without dropping the guard above it will be a deadlock // also locks upload queue, without dropping the guard above it will be a deadlock
@@ -1227,6 +1233,7 @@ impl RemoteTimelineClient {
Some(self.timeline_id), Some(self.timeline_id),
"remote upload", "remote upload",
false, false,
upload_queue.cancel.child_token(),
async move { async move {
self_rc.perform_upload_task(task).await; self_rc.perform_upload_task(task).await;
Ok(()) Ok(())
@@ -1561,6 +1568,13 @@ impl RemoteTimelineClient {
dangling_files: HashMap::default(), dangling_files: HashMap::default(),
shutting_down: false, shutting_down: false,
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
// TODO: this is the only place where we cannot reasonably continue the
// tree
cancel: crate::PAGESERVER_SHUTDOWN_TOKEN
.get()
.cloned()
.unwrap_or_default()
.child_token(),
}; };
let upload_queue = std::mem::replace( let upload_queue = std::mem::replace(

View File

@@ -841,6 +841,7 @@ impl LayerInner {
Some(self.desc.timeline_id), Some(self.desc.timeline_id),
&task_name, &task_name,
false, false,
timeline.cancel.child_token(),
async move { async move {
let client = timeline let client = timeline
@@ -860,6 +861,21 @@ impl LayerInner {
Ok(()) Ok(())
} }
Err(e) => { Err(e) => {
let consecutive_failures =
this.consecutive_failures.fetch_add(1, Ordering::Relaxed);
let backoff = utils::backoff::exponential_backoff_duration_seconds(
consecutive_failures.min(u32::MAX as usize) as u32,
1.5,
60.0,
);
let backoff = std::time::Duration::from_secs_f64(backoff);
tokio::select! {
_ = tokio::time::sleep(backoff) => {},
_ = crate::task_mgr::shutdown_token().cancelled_owned() => {},
};
Err(e) Err(e)
} }
}; };
@@ -907,24 +923,7 @@ impl LayerInner {
Ok(permit) Ok(permit)
} }
Ok((Err(e), _permit)) => { Ok((Err(_), _permit)) => Err(DownloadError::DownloadFailed),
// FIXME: this should be with the spawned task and be cancellation sensitive
//
// while we should not need this, this backoff has turned out to be useful with
// a bug of unexpectedly deleted remote layer file (#5787).
let consecutive_failures =
self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
let backoff = utils::backoff::exponential_backoff_duration_seconds(
consecutive_failures.min(u32::MAX as usize) as u32,
1.5,
60.0,
);
let backoff = std::time::Duration::from_secs_f64(backoff);
tokio::time::sleep(backoff).await;
Err(DownloadError::DownloadFailed)
}
Err(_gone) => Err(DownloadError::DownloadCancelled), Err(_gone) => Err(DownloadError::DownloadCancelled),
} }
} }

View File

@@ -54,31 +54,21 @@ impl BackgroundLoopKind {
} }
} }
pub(crate) enum RateLimitError { pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
Cancelled,
}
pub(crate) async fn concurrent_background_tasks_rate_limit(
loop_kind: BackgroundLoopKind, loop_kind: BackgroundLoopKind,
_ctx: &RequestContext, _ctx: &RequestContext,
cancel: &CancellationToken, ) -> impl Drop {
) -> Result<impl Drop, RateLimitError> {
crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_START_COUNT crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_START_COUNT
.with_label_values(&[loop_kind.as_static_str()]) .with_label_values(&[loop_kind.as_static_str()])
.inc(); .inc();
scopeguard::defer!( scopeguard::defer!(
crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_FINISH_COUNT.with_label_values(&[loop_kind.as_static_str()]).inc(); crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_FINISH_COUNT.with_label_values(&[loop_kind.as_static_str()]).inc();
); );
tokio::select! {
permit = CONCURRENT_BACKGROUND_TASKS.acquire() => { match CONCURRENT_BACKGROUND_TASKS.acquire().await {
match permit { Ok(permit) => permit,
Ok(permit) => Ok(permit), Err(_closed) => unreachable!("we never close the semaphore"),
Err(_closed) => unreachable!("we never close the semaphore"),
}
},
_ = cancel.cancelled() => {
Err(RateLimitError::Cancelled)
}
} }
} }
@@ -95,6 +85,7 @@ pub fn start_background_loops(
None, None,
&format!("compactor for tenant {tenant_shard_id}"), &format!("compactor for tenant {tenant_shard_id}"),
false, false,
tenant.cancel.child_token(),
{ {
let tenant = Arc::clone(tenant); let tenant = Arc::clone(tenant);
let background_jobs_can_start = background_jobs_can_start.cloned(); let background_jobs_can_start = background_jobs_can_start.cloned();
@@ -118,6 +109,7 @@ pub fn start_background_loops(
None, None,
&format!("garbage collector for tenant {tenant_shard_id}"), &format!("garbage collector for tenant {tenant_shard_id}"),
false, false,
tenant.cancel.child_token(),
{ {
let tenant = Arc::clone(tenant); let tenant = Arc::clone(tenant);
let background_jobs_can_start = background_jobs_can_start.cloned(); let background_jobs_can_start = background_jobs_can_start.cloned();

View File

@@ -51,7 +51,7 @@ use crate::tenant::storage_layer::{
LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult, LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult,
ValueReconstructState, ValueReconstructState,
}; };
use crate::tenant::tasks::{BackgroundLoopKind, RateLimitError}; use crate::tenant::tasks::BackgroundLoopKind;
use crate::tenant::timeline::logical_size::CurrentLogicalSize; use crate::tenant::timeline::logical_size::CurrentLogicalSize;
use crate::tenant::{ use crate::tenant::{
layer_map::{LayerMap, SearchResult}, layer_map::{LayerMap, SearchResult},
@@ -708,19 +708,27 @@ impl Timeline {
flags: EnumSet<CompactFlags>, flags: EnumSet<CompactFlags>,
ctx: &RequestContext, ctx: &RequestContext,
) -> Result<(), CompactionError> { ) -> Result<(), CompactionError> {
let _g = self.compaction_lock.lock().await; // most likely the cancellation token is from background task, but in tests it could be the
// request task as well.
let prepare = async move {
let guard = self.compaction_lock.lock().await;
let permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
BackgroundLoopKind::Compaction,
ctx,
)
.await;
(guard, permit)
};
// this wait probably never needs any "long time spent" logging, because we already nag if // this wait probably never needs any "long time spent" logging, because we already nag if
// compaction task goes over it's period (20s) which is quite often in production. // compaction task goes over it's period (20s) which is quite often in production.
let _permit = match super::tasks::concurrent_background_tasks_rate_limit( let (_guard, _permit) = tokio::select! {
BackgroundLoopKind::Compaction, tuple = prepare => { tuple },
ctx, _ = self.cancel.cancelled() => return Ok(()),
cancel, _ = cancel.cancelled() => return Ok(()),
)
.await
{
Ok(permit) => permit,
Err(RateLimitError::Cancelled) => return Ok(()),
}; };
let last_record_lsn = self.get_last_record_lsn(); let last_record_lsn = self.get_last_record_lsn();
@@ -1389,6 +1397,7 @@ impl Timeline {
Some(self.timeline_id), Some(self.timeline_id),
"layer flush task", "layer flush task",
false, false,
self.cancel.child_token(),
async move { async move {
let _guard = guard; let _guard = guard;
let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error); let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error);
@@ -1740,6 +1749,7 @@ impl Timeline {
Some(self.timeline_id), Some(self.timeline_id),
"initial size calculation", "initial size calculation",
false, false,
self.cancel.child_token(),
// NB: don't log errors here, task_mgr will do that. // NB: don't log errors here, task_mgr will do that.
async move { async move {
let cancel = task_mgr::shutdown_token(); let cancel = task_mgr::shutdown_token();
@@ -1775,22 +1785,19 @@ impl Timeline {
let skip_concurrency_limiter = &skip_concurrency_limiter; let skip_concurrency_limiter = &skip_concurrency_limiter;
async move { async move {
let cancel = task_mgr::shutdown_token(); let cancel = task_mgr::shutdown_token();
let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit( let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
BackgroundLoopKind::InitialLogicalSizeCalculation, BackgroundLoopKind::InitialLogicalSizeCalculation,
background_ctx, background_ctx,
&cancel,
); );
use crate::metrics::initial_logical_size::StartCircumstances; use crate::metrics::initial_logical_size::StartCircumstances;
let (_maybe_permit, circumstances) = tokio::select! { let (_maybe_permit, circumstances) = tokio::select! {
res = wait_for_permit => { permit = wait_for_permit => {
match res { (Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit)
Ok(permit) => (Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit),
Err(RateLimitError::Cancelled) => {
return Err(BackgroundCalculationError::Cancelled);
}
}
} }
_ = cancel.cancelled() => {
return Err(BackgroundCalculationError::Cancelled);
},
() = skip_concurrency_limiter.cancelled() => { () = skip_concurrency_limiter.cancelled() => {
// Some action that is part of a end user interaction requested logical size // Some action that is part of a end user interaction requested logical size
// => break out of the rate limit // => break out of the rate limit
@@ -1913,6 +1920,7 @@ impl Timeline {
Some(self.timeline_id), Some(self.timeline_id),
"ondemand logical size calculation", "ondemand logical size calculation",
false, false,
self.cancel.child_token(),
async move { async move {
let res = self_clone let res = self_clone
.logical_size_calculation_task(lsn, cause, &ctx) .logical_size_calculation_task(lsn, cause, &ctx)
@@ -3796,7 +3804,14 @@ impl Timeline {
/// within a layer file. We can only remove the whole file if it's fully /// within a layer file. We can only remove the whole file if it's fully
/// obsolete. /// obsolete.
pub(super) async fn gc(&self) -> anyhow::Result<GcResult> { pub(super) async fn gc(&self) -> anyhow::Result<GcResult> {
let _g = self.gc_lock.lock().await; // this is most likely the background tasks, but it might be the spawned task from
// immediate_gc
let cancel = crate::task_mgr::shutdown_token();
let _g = tokio::select! {
guard = self.gc_lock.lock() => guard,
_ = self.cancel.cancelled() => return Ok(GcResult::default()),
_ = cancel.cancelled() => return Ok(GcResult::default()),
};
let timer = self.metrics.garbage_collect_histo.start_timer(); let timer = self.metrics.garbage_collect_histo.start_timer();
fail_point!("before-timeline-gc"); fail_point!("before-timeline-gc");
@@ -4138,6 +4153,7 @@ impl Timeline {
Some(self.timeline_id), Some(self.timeline_id),
"download all remote layers task", "download all remote layers task",
false, false,
self.cancel.child_token(),
async move { async move {
self_clone.download_all_remote_layers(request).await; self_clone.download_all_remote_layers(request).await;
let mut status_guard = self_clone.download_all_remote_layers_task_info.write().unwrap(); let mut status_guard = self_clone.download_all_remote_layers_task_info.write().unwrap();

View File

@@ -6,6 +6,7 @@ use std::{
use anyhow::Context; use anyhow::Context;
use pageserver_api::{models::TimelineState, shard::TenantShardId}; use pageserver_api::{models::TimelineState, shard::TenantShardId};
use tokio::sync::OwnedMutexGuard; use tokio::sync::OwnedMutexGuard;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, warn, Instrument, Span}; use tracing::{debug, error, info, instrument, warn, Instrument, Span};
use utils::{crashsafe, fs_ext, id::TimelineId}; use utils::{crashsafe, fs_ext, id::TimelineId};
@@ -406,6 +407,7 @@ impl DeleteTimelineFlow {
local_metadata: &TimelineMetadata, local_metadata: &TimelineMetadata,
remote_client: Option<RemoteTimelineClient>, remote_client: Option<RemoteTimelineClient>,
deletion_queue_client: DeletionQueueClient, deletion_queue_client: DeletionQueueClient,
cancel: CancellationToken,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// Note: here we even skip populating layer map. Timeline is essentially uninitialized. // Note: here we even skip populating layer map. Timeline is essentially uninitialized.
// RemoteTimelineClient is the only functioning part. // RemoteTimelineClient is the only functioning part.
@@ -421,6 +423,7 @@ impl DeleteTimelineFlow {
// Important. We dont pass ancestor above because it can be missing. // Important. We dont pass ancestor above because it can be missing.
// Thus we need to skip the validation here. // Thus we need to skip the validation here.
CreateTimelineCause::Delete, CreateTimelineCause::Delete,
cancel,
) )
.context("create_timeline_struct")?; .context("create_timeline_struct")?;
@@ -532,6 +535,7 @@ impl DeleteTimelineFlow {
Some(timeline_id), Some(timeline_id),
"timeline_delete", "timeline_delete",
false, false,
tenant.cancel.child_token(),
async move { async move {
if let Err(err) = Self::background(guard, conf, &tenant, &timeline).await { if let Err(err) = Self::background(guard, conf, &tenant, &timeline).await {
error!("Error: {err:#}"); error!("Error: {err:#}");

View File

@@ -30,7 +30,7 @@ use crate::{
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}, task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{ tenant::{
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold}, config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
tasks::{BackgroundLoopKind, RateLimitError}, tasks::BackgroundLoopKind,
timeline::EvictionError, timeline::EvictionError,
LogicalSizeCalculationCause, Tenant, LogicalSizeCalculationCause, Tenant,
}, },
@@ -67,6 +67,7 @@ impl Timeline {
self.tenant_shard_id, self.timeline_id self.tenant_shard_id, self.timeline_id
), ),
false, false,
self.cancel.child_token(),
async move { async move {
let cancel = task_mgr::shutdown_token(); let cancel = task_mgr::shutdown_token();
tokio::select! { tokio::select! {
@@ -158,15 +159,14 @@ impl Timeline {
) -> ControlFlow<()> { ) -> ControlFlow<()> {
let now = SystemTime::now(); let now = SystemTime::now();
let _permit = match crate::tenant::tasks::concurrent_background_tasks_rate_limit( let acquire_permit = crate::tenant::tasks::concurrent_background_tasks_rate_limit_permit(
BackgroundLoopKind::Eviction, BackgroundLoopKind::Eviction,
ctx, ctx,
cancel, );
)
.await let _permit = tokio::select! {
{ permit = acquire_permit => permit,
Ok(permit) => permit, _ = cancel.cancelled() => return ControlFlow::Break(()),
Err(RateLimitError::Cancelled) => return ControlFlow::Break(()),
}; };
// If we evict layers but keep cached values derived from those layers, then // If we evict layers but keep cached values derived from those layers, then

View File

@@ -87,6 +87,7 @@ impl WalReceiver {
Some(timeline_id), Some(timeline_id),
&format!("walreceiver for timeline {tenant_shard_id}/{timeline_id}"), &format!("walreceiver for timeline {tenant_shard_id}/{timeline_id}"),
false, false,
timeline.cancel.child_token(),
async move { async move {
debug_assert_current_span_has_tenant_and_timeline_id(); debug_assert_current_span_has_tenant_and_timeline_id();
debug!("WAL receiver manager started, connecting to broker"); debug!("WAL receiver manager started, connecting to broker");

View File

@@ -167,6 +167,7 @@ pub(super) async fn handle_walreceiver_connection(
Some(timeline.timeline_id), Some(timeline.timeline_id),
"walreceiver connection", "walreceiver connection",
false, false,
cancellation.clone(),
async move { async move {
debug_assert_current_span_has_tenant_and_timeline_id(); debug_assert_current_span_has_tenant_and_timeline_id();

View File

@@ -8,6 +8,7 @@ use std::fmt::Debug;
use chrono::NaiveDateTime; use chrono::NaiveDateTime;
use std::sync::Arc; use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use tracing::info; use tracing::info;
use utils::lsn::AtomicLsn; use utils::lsn::AtomicLsn;
@@ -98,6 +99,8 @@ pub(crate) struct UploadQueueInitialized {
/// wait on until one of them stops the queue. The semaphore is closed when /// wait on until one of them stops the queue. The semaphore is closed when
/// `RemoteTimelineClient::launch_queued_tasks` encounters `UploadOp::Shutdown`. /// `RemoteTimelineClient::launch_queued_tasks` encounters `UploadOp::Shutdown`.
pub(crate) shutdown_ready: Arc<tokio::sync::Semaphore>, pub(crate) shutdown_ready: Arc<tokio::sync::Semaphore>,
pub(crate) cancel: CancellationToken,
} }
impl UploadQueueInitialized { impl UploadQueueInitialized {
@@ -130,6 +133,7 @@ impl UploadQueue {
pub(crate) fn initialize_empty_remote( pub(crate) fn initialize_empty_remote(
&mut self, &mut self,
metadata: &TimelineMetadata, metadata: &TimelineMetadata,
cancel: CancellationToken,
) -> anyhow::Result<&mut UploadQueueInitialized> { ) -> anyhow::Result<&mut UploadQueueInitialized> {
match self { match self {
UploadQueue::Uninitialized => (), UploadQueue::Uninitialized => (),
@@ -158,6 +162,7 @@ impl UploadQueue {
dangling_files: HashMap::new(), dangling_files: HashMap::new(),
shutting_down: false, shutting_down: false,
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
cancel,
}; };
*self = UploadQueue::Initialized(state); *self = UploadQueue::Initialized(state);
@@ -167,6 +172,7 @@ impl UploadQueue {
pub(crate) fn initialize_with_current_remote_index_part( pub(crate) fn initialize_with_current_remote_index_part(
&mut self, &mut self,
index_part: &IndexPart, index_part: &IndexPart,
cancel: CancellationToken,
) -> anyhow::Result<&mut UploadQueueInitialized> { ) -> anyhow::Result<&mut UploadQueueInitialized> {
match self { match self {
UploadQueue::Uninitialized => (), UploadQueue::Uninitialized => (),
@@ -207,6 +213,7 @@ impl UploadQueue {
dangling_files: HashMap::new(), dangling_files: HashMap::new(),
shutting_down: false, shutting_down: false,
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
cancel,
}; };
*self = UploadQueue::Initialized(state); *self = UploadQueue::Initialized(state);