From ddae6e2b0a29d14a45ee1d1c66da9e9ae460cc15 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Wed, 13 Dec 2023 22:28:35 +0000 Subject: [PATCH] feat: task hierarchy --- pageserver/src/bin/pageserver.rs | 19 ++++++++-- pageserver/src/consumption_metrics.rs | 1 + pageserver/src/disk_usage_eviction_task.rs | 2 + pageserver/src/lib.rs | 11 ++++++ pageserver/src/page_service.rs | 1 + pageserver/src/task_mgr.rs | 3 +- pageserver/src/tenant.rs | 37 +++++++++++++++---- pageserver/src/tenant/delete.rs | 7 ++++ pageserver/src/tenant/mgr.rs | 7 ++++ .../src/tenant/remote_timeline_client.rs | 22 +++++++++-- pageserver/src/tenant/storage_layer/layer.rs | 1 + pageserver/src/tenant/tasks.rs | 2 + pageserver/src/tenant/timeline.rs | 4 ++ pageserver/src/tenant/timeline/delete.rs | 4 ++ .../src/tenant/timeline/eviction_task.rs | 2 + pageserver/src/tenant/timeline/walreceiver.rs | 1 + .../walreceiver/walreceiver_connection.rs | 1 + pageserver/src/tenant/upload_queue.rs | 7 ++++ 18 files changed, 116 insertions(+), 16 deletions(-) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 10b687f805..2d8708aeef 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -370,6 +370,11 @@ fn start_pageserver( // Top-level cancellation token for the process let shutdown_pageserver = tokio_util::sync::CancellationToken::new(); + pageserver::PAGESERVER_SHUTDOWN_TOKEN + .set(shutdown_pageserver.clone()) + .map_err(|_| ()) + .expect("cannot be set already"); + // Set up remote storage client let remote_storage = create_remote_storage_client(conf)?; @@ -516,6 +521,7 @@ fn start_pageserver( remote_storage.clone(), disk_usage_eviction_state.clone(), background_jobs_barrier.clone(), + shutdown_pageserver.child_token(), )?; } @@ -536,13 +542,16 @@ fn start_pageserver( ) .context("Failed to initialize router state")?, ); + + let cancel = shutdown_pageserver.child_token(); + let router = http::make_router(router_state, launch_ts, http_auth.clone())? .build() .map_err(|err| anyhow!(err))?; let service = utils::http::RouterService::new(router).unwrap(); let server = hyper::Server::from_tcp(http_listener)? .serve(service) - .with_graceful_shutdown(task_mgr::shutdown_watcher()); + .with_graceful_shutdown(cancel.clone().cancelled_owned()); task_mgr::spawn( MGMT_REQUEST_RUNTIME.handle(), @@ -551,6 +560,7 @@ fn start_pageserver( None, "http endpoint listener", true, + cancel, async { server.await?; Ok(()) @@ -576,6 +586,7 @@ fn start_pageserver( None, "consumption metrics collection", true, + shutdown_pageserver.child_token(), async move { // first wait until background jobs are cleared to launch. // @@ -624,6 +635,7 @@ fn start_pageserver( None, "libpq endpoint listener", true, + shutdown_pageserver.child_token(), async move { page_service::libpq_listener_main( conf, @@ -657,9 +669,8 @@ fn start_pageserver( signal.name() ); - // This cancels the `shutdown_pageserver` cancellation tree. - // Right now that tree doesn't reach very far, and `task_mgr` is used instead. - // The plan is to change that over time. + // This cancels the `shutdown_pageserver` cancellation tree and signals cancellation to + // all tasks in the system. shutdown_pageserver.take(); let bg_remote_storage = remote_storage.clone(); let bg_deletion_queue = deletion_queue.clone(); diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index bde2cedca7..c65843a74b 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -65,6 +65,7 @@ pub async fn collect_metrics( None, "synthetic size calculation", false, + cancel.child_token(), async move { calculate_synthetic_size_worker( synthetic_size_calculation_interval, diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index 76906cfaf7..a35f0a12d7 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -87,6 +87,7 @@ pub fn launch_disk_usage_global_eviction_task( storage: GenericRemoteStorage, state: Arc, background_jobs_barrier: completion::Barrier, + cancel: CancellationToken, ) -> anyhow::Result<()> { let Some(task_config) = &conf.disk_usage_based_eviction else { info!("disk usage based eviction task not configured"); @@ -102,6 +103,7 @@ pub fn launch_disk_usage_global_eviction_task( None, "disk usage based eviction", false, + cancel, async move { let cancel = task_mgr::shutdown_token(); diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 797cb6f944..7fd64a941e 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -49,11 +49,22 @@ pub const DELTA_FILE_MAGIC: u16 = 0x5A61; static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]); +/// The main cancellation token for the process. +/// +/// Should only ever be used to create child tokens. +pub static PAGESERVER_SHUTDOWN_TOKEN: std::sync::OnceLock = + std::sync::OnceLock::new(); + pub use crate::metrics::preinitialize_metrics; #[tracing::instrument(skip_all, fields(%exit_code))] pub async fn shutdown_pageserver(deletion_queue: Option, exit_code: i32) { use std::time::Duration; + + if let Some(token) = PAGESERVER_SHUTDOWN_TOKEN.get() { + token.cancel(); + } + // Shut down the libpq endpoint task. This prevents new connections from // being accepted. timed( diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index d5ca7f7382..b33eb3bd09 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -166,6 +166,7 @@ pub async fn libpq_listener_main( None, "serving compute connection task", false, + cancel.child_token(), page_service_conn_main( conf, broker_client.clone(), diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 37e771560d..881ffc3e63 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -334,12 +334,13 @@ pub fn spawn( timeline_id: Option, name: &str, shutdown_process_on_error: bool, + cancel: CancellationToken, future: F, ) -> PageserverTaskId where F: Future> + Send + 'static, { - let cancel = CancellationToken::new(); + // let cancel = CancellationToken::new(); let task_id = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed); let task = Arc::new(PageServerTask { task_id: PageserverTaskId(task_id), diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index a8e8b4cbfa..a09461f5ff 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -486,6 +486,7 @@ impl Tenant { ancestor.clone(), resources, CreateTimelineCause::Load, + self.cancel.child_token(), )?; let disk_consistent_lsn = timeline.get_disk_consistent_lsn(); anyhow::ensure!( @@ -503,7 +504,7 @@ impl Tenant { .remote_client .as_ref() .unwrap() - .init_upload_queue(index_part)?; + .init_upload_queue(index_part, timeline.cancel.child_token())?; } else if self.remote_storage.is_some() { // No data on the remote storage, but we have local metadata file. We can end up // here with timeline_create being interrupted before finishing index part upload. @@ -511,7 +512,7 @@ impl Tenant { // If control plane retries timeline creation in the meantime, the mgmt API handler // for timeline creation will coalesce on the upload we queue here. let rtc = timeline.remote_client.as_ref().unwrap(); - rtc.init_upload_queue_for_empty_remote(&metadata)?; + rtc.init_upload_queue_for_empty_remote(&metadata, timeline.cancel.child_token())?; rtc.schedule_index_upload_for_metadata_update(&metadata)?; } @@ -605,6 +606,12 @@ impl Tenant { let tenant_clone = Arc::clone(&tenant); let ctx = ctx.detached_child(TaskKind::Attach, DownloadBehavior::Warn); + let cancel = crate::PAGESERVER_SHUTDOWN_TOKEN + .get() + .cloned() + .unwrap_or_default() + .child_token(); + task_mgr::spawn( &tokio::runtime::Handle::current(), TaskKind::Attach, @@ -612,6 +619,7 @@ impl Tenant { None, "attach tenant", false, + cancel, async move { // Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state. let make_broken = @@ -871,8 +879,10 @@ impl Tenant { // Walk through deleted timelines, resume deletion for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions { + let cancel = self.cancel.child_token(); + remote_timeline_client - .init_upload_queue_stopped_to_continue_deletion(&index_part) + .init_upload_queue_stopped_to_continue_deletion(&index_part, cancel.child_token()) .context("init queue stopped") .map_err(LoadLocalTimelineError::ResumeDeletion)?; @@ -882,6 +892,7 @@ impl Tenant { &index_part.metadata, Some(remote_timeline_client), self.deletion_queue_client.clone(), + cancel, ) .await .context("resume_deletion") @@ -1215,7 +1226,7 @@ impl Tenant { timeline_id, self.generation, ); - let cancel_clone = cancel.clone(); + let cancel_clone = cancel.child_token(); part_downloads.spawn( async move { debug!("starting index part download"); @@ -1376,6 +1387,7 @@ impl Tenant { &local_metadata, None, self.deletion_queue_client.clone(), + self.cancel.child_token(), ) .await .context("resume deletion") @@ -2290,6 +2302,7 @@ impl Tenant { ancestor: Option>, resources: TimelineResources, cause: CreateTimelineCause, + cancel: CancellationToken, ) -> anyhow::Result> { let state = match cause { CreateTimelineCause::Load => { @@ -2318,7 +2331,7 @@ impl Tenant { resources, pg_version, state, - self.cancel.child_token(), + cancel, ); Ok(timeline) @@ -2391,6 +2404,12 @@ impl Tenant { } }); + let cancel = crate::PAGESERVER_SHUTDOWN_TOKEN + .get() + .cloned() + .unwrap_or_default() + .child_token(); + Tenant { tenant_shard_id, shard_identity, @@ -2410,7 +2429,7 @@ impl Tenant { cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)), eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()), delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())), - cancel: CancellationToken::default(), + cancel, gate: Gate::new(format!("Tenant<{tenant_shard_id}>")), } } @@ -3154,8 +3173,11 @@ impl Tenant { let tenant_shard_id = self.tenant_shard_id; let resources = self.build_timeline_resources(new_timeline_id); + + let cancel = self.cancel.child_token(); + if let Some(remote_client) = &resources.remote_client { - remote_client.init_upload_queue_for_empty_remote(new_metadata)?; + remote_client.init_upload_queue_for_empty_remote(new_metadata, cancel.child_token())?; } let timeline_struct = self @@ -3165,6 +3187,7 @@ impl Tenant { ancestor, resources, CreateTimelineCause::Load, + cancel, ) .context("Failed to create timeline data structure")?; diff --git a/pageserver/src/tenant/delete.rs b/pageserver/src/tenant/delete.rs index acd311ace6..4b60537e96 100644 --- a/pageserver/src/tenant/delete.rs +++ b/pageserver/src/tenant/delete.rs @@ -460,6 +460,12 @@ impl DeleteTenantFlow { ) { let tenant_shard_id = tenant.tenant_shard_id; + let cancel = crate::PAGESERVER_SHUTDOWN_TOKEN + .get() + .cloned() + .unwrap_or_default() + .child_token(); + task_mgr::spawn( task_mgr::BACKGROUND_RUNTIME.handle(), TaskKind::TimelineDeletionWorker, @@ -467,6 +473,7 @@ impl DeleteTenantFlow { None, "tenant_delete", false, + cancel, async move { if let Err(err) = Self::background(guard, conf, remote_storage, tenants, &tenant).await diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 4587a5159a..8e5b5b6ba0 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -1352,6 +1352,11 @@ pub(crate) async fn detach_tenant( // Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory. // After a tenant is detached, there are no more task_mgr tasks for that tenant_id. let task_tenant_id = None; + let cancel = crate::PAGESERVER_SHUTDOWN_TOKEN + .get() + .cloned() + .unwrap_or_default() + .child_token(); task_mgr::spawn( task_mgr::BACKGROUND_RUNTIME.handle(), TaskKind::MgmtRequest, @@ -1359,6 +1364,7 @@ pub(crate) async fn detach_tenant( None, "tenant_files_delete", false, + cancel, async move { fs::remove_dir_all(tmp_path.as_path()) .await @@ -2086,6 +2092,7 @@ pub(crate) async fn immediate_gc( Some(timeline_id), &format!("timeline_gc_handler garbage collection run for tenant {tenant_shard_id} timeline {timeline_id}"), false, + tenant.cancel.child_token(), async move { fail::fail_point!("immediate_gc_task_pre"); diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 3765ff6e7a..a581099e46 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -357,9 +357,13 @@ impl RemoteTimelineClient { /// Initialize the upload queue for a remote storage that already received /// an index file upload, i.e., it's not empty. /// The given `index_part` must be the one on the remote. - pub fn init_upload_queue(&self, index_part: &IndexPart) -> anyhow::Result<()> { + pub fn init_upload_queue( + &self, + index_part: &IndexPart, + cancel: CancellationToken, + ) -> anyhow::Result<()> { let mut upload_queue = self.upload_queue.lock().unwrap(); - upload_queue.initialize_with_current_remote_index_part(index_part)?; + upload_queue.initialize_with_current_remote_index_part(index_part, cancel)?; self.update_remote_physical_size_gauge(Some(index_part)); info!( "initialized upload queue from remote index with {} layer files", @@ -373,9 +377,10 @@ impl RemoteTimelineClient { pub fn init_upload_queue_for_empty_remote( &self, local_metadata: &TimelineMetadata, + cancel: CancellationToken, ) -> anyhow::Result<()> { let mut upload_queue = self.upload_queue.lock().unwrap(); - upload_queue.initialize_empty_remote(local_metadata)?; + upload_queue.initialize_empty_remote(local_metadata, cancel)?; self.update_remote_physical_size_gauge(None); info!("initialized upload queue as empty"); Ok(()) @@ -386,6 +391,7 @@ impl RemoteTimelineClient { pub fn init_upload_queue_stopped_to_continue_deletion( &self, index_part: &IndexPart, + cancel: CancellationToken, ) -> anyhow::Result<()> { // FIXME: consider newtype for DeletedIndexPart. let deleted_at = index_part.deleted_at.ok_or(anyhow::anyhow!( @@ -394,7 +400,7 @@ impl RemoteTimelineClient { { let mut upload_queue = self.upload_queue.lock().unwrap(); - upload_queue.initialize_with_current_remote_index_part(index_part)?; + upload_queue.initialize_with_current_remote_index_part(index_part, cancel)?; self.update_remote_physical_size_gauge(Some(index_part)); } // also locks upload queue, without dropping the guard above it will be a deadlock @@ -1227,6 +1233,7 @@ impl RemoteTimelineClient { Some(self.timeline_id), "remote upload", false, + upload_queue.cancel.child_token(), async move { self_rc.perform_upload_task(task).await; Ok(()) @@ -1561,6 +1568,13 @@ impl RemoteTimelineClient { dangling_files: HashMap::default(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), + // TODO: this is the only place where we cannot reasonably continue the + // tree + cancel: crate::PAGESERVER_SHUTDOWN_TOKEN + .get() + .cloned() + .unwrap_or_default() + .child_token(), }; let upload_queue = std::mem::replace( diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 112128ead8..4e0240a66b 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -841,6 +841,7 @@ impl LayerInner { Some(self.desc.timeline_id), &task_name, false, + timeline.cancel.child_token(), async move { let client = timeline diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 2c881990d5..38b35f4a1f 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -85,6 +85,7 @@ pub fn start_background_loops( None, &format!("compactor for tenant {tenant_shard_id}"), false, + tenant.cancel.child_token(), { let tenant = Arc::clone(tenant); let background_jobs_can_start = background_jobs_can_start.cloned(); @@ -108,6 +109,7 @@ pub fn start_background_loops( None, &format!("garbage collector for tenant {tenant_shard_id}"), false, + tenant.cancel.child_token(), { let tenant = Arc::clone(tenant); let background_jobs_can_start = background_jobs_can_start.cloned(); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 099134ee1e..4274bd080a 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1397,6 +1397,7 @@ impl Timeline { Some(self.timeline_id), "layer flush task", false, + self.cancel.child_token(), async move { let _guard = guard; let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error); @@ -1748,6 +1749,7 @@ impl Timeline { Some(self.timeline_id), "initial size calculation", false, + self.cancel.child_token(), // NB: don't log errors here, task_mgr will do that. async move { let cancel = task_mgr::shutdown_token(); @@ -1921,6 +1923,7 @@ impl Timeline { Some(self.timeline_id), "ondemand logical size calculation", false, + self.cancel.child_token(), async move { let res = self_clone .logical_size_calculation_task(lsn, cause, &ctx) @@ -4153,6 +4156,7 @@ impl Timeline { Some(self.timeline_id), "download all remote layers task", false, + self.cancel.child_token(), async move { self_clone.download_all_remote_layers(request).await; let mut status_guard = self_clone.download_all_remote_layers_task_info.write().unwrap(); diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index be873181d9..fd4ad970d4 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -6,6 +6,7 @@ use std::{ use anyhow::Context; use pageserver_api::{models::TimelineState, shard::TenantShardId}; use tokio::sync::OwnedMutexGuard; +use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, instrument, warn, Instrument, Span}; use utils::{crashsafe, fs_ext, id::TimelineId}; @@ -406,6 +407,7 @@ impl DeleteTimelineFlow { local_metadata: &TimelineMetadata, remote_client: Option, deletion_queue_client: DeletionQueueClient, + cancel: CancellationToken, ) -> anyhow::Result<()> { // Note: here we even skip populating layer map. Timeline is essentially uninitialized. // RemoteTimelineClient is the only functioning part. @@ -421,6 +423,7 @@ impl DeleteTimelineFlow { // Important. We dont pass ancestor above because it can be missing. // Thus we need to skip the validation here. CreateTimelineCause::Delete, + cancel, ) .context("create_timeline_struct")?; @@ -532,6 +535,7 @@ impl DeleteTimelineFlow { Some(timeline_id), "timeline_delete", false, + tenant.cancel.child_token(), async move { if let Err(err) = Self::background(guard, conf, &tenant, &timeline).await { error!("Error: {err:#}"); diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index ea5f5f5fa7..6fd7464110 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -67,10 +67,12 @@ impl Timeline { self.tenant_shard_id, self.timeline_id ), false, + self.cancel.child_token(), async move { let cancel = task_mgr::shutdown_token(); tokio::select! { _ = cancel.cancelled() => { return Ok(()); } + _ = self_clone.cancel.cancelled() => { return Ok(()); } _ = completion::Barrier::maybe_wait(background_tasks_can_start) => {} }; diff --git a/pageserver/src/tenant/timeline/walreceiver.rs b/pageserver/src/tenant/timeline/walreceiver.rs index e32265afb5..e3b92fb02c 100644 --- a/pageserver/src/tenant/timeline/walreceiver.rs +++ b/pageserver/src/tenant/timeline/walreceiver.rs @@ -87,6 +87,7 @@ impl WalReceiver { Some(timeline_id), &format!("walreceiver for timeline {tenant_shard_id}/{timeline_id}"), false, + timeline.cancel.child_token(), async move { debug_assert_current_span_has_tenant_and_timeline_id(); debug!("WAL receiver manager started, connecting to broker"); diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 61ab236322..1215bd7313 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -167,6 +167,7 @@ pub(super) async fn handle_walreceiver_connection( Some(timeline.timeline_id), "walreceiver connection", false, + cancellation.clone(), async move { debug_assert_current_span_has_tenant_and_timeline_id(); diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index 32f14f40c5..08adc3912b 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -8,6 +8,7 @@ use std::fmt::Debug; use chrono::NaiveDateTime; use std::sync::Arc; +use tokio_util::sync::CancellationToken; use tracing::info; use utils::lsn::AtomicLsn; @@ -98,6 +99,8 @@ pub(crate) struct UploadQueueInitialized { /// wait on until one of them stops the queue. The semaphore is closed when /// `RemoteTimelineClient::launch_queued_tasks` encounters `UploadOp::Shutdown`. pub(crate) shutdown_ready: Arc, + + pub(crate) cancel: CancellationToken, } impl UploadQueueInitialized { @@ -130,6 +133,7 @@ impl UploadQueue { pub(crate) fn initialize_empty_remote( &mut self, metadata: &TimelineMetadata, + cancel: CancellationToken, ) -> anyhow::Result<&mut UploadQueueInitialized> { match self { UploadQueue::Uninitialized => (), @@ -158,6 +162,7 @@ impl UploadQueue { dangling_files: HashMap::new(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), + cancel, }; *self = UploadQueue::Initialized(state); @@ -167,6 +172,7 @@ impl UploadQueue { pub(crate) fn initialize_with_current_remote_index_part( &mut self, index_part: &IndexPart, + cancel: CancellationToken, ) -> anyhow::Result<&mut UploadQueueInitialized> { match self { UploadQueue::Uninitialized => (), @@ -207,6 +213,7 @@ impl UploadQueue { dangling_files: HashMap::new(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), + cancel, }; *self = UploadQueue::Initialized(state);