diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index e328cd2044..be69f3d67f 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -18,7 +18,6 @@ use camino::Utf8Path; use camino::Utf8PathBuf; use enumset::EnumSet; use futures::stream::FuturesUnordered; -use futures::FutureExt; use futures::StreamExt; use pageserver_api::models; use pageserver_api::models::AuxFilePolicy; @@ -34,6 +33,7 @@ use remote_storage::GenericRemoteStorage; use remote_storage::TimeoutOrCancel; use std::collections::BTreeMap; use std::fmt; +use std::future::Future; use std::sync::Weak; use std::time::SystemTime; use storage_broker::BrokerClientChannel; @@ -1031,13 +1031,9 @@ impl Tenant { } Ok(TenantPreload { - timelines: Self::load_timeline_metadata( - self, - remote_timeline_ids, - remote_storage, - cancel, - ) - .await?, + timelines: self + .load_timelines_metadata(remote_timeline_ids, remote_storage, cancel) + .await?, }) } @@ -1303,7 +1299,7 @@ impl Tenant { .await } - async fn load_timeline_metadata( + async fn load_timelines_metadata( self: &Arc, timeline_ids: HashSet, remote_storage: &GenericRemoteStorage, @@ -1311,33 +1307,10 @@ impl Tenant { ) -> anyhow::Result> { let mut part_downloads = JoinSet::new(); for timeline_id in timeline_ids { - let client = RemoteTimelineClient::new( - remote_storage.clone(), - self.deletion_queue_client.clone(), - self.conf, - self.tenant_shard_id, - timeline_id, - self.generation, - ); let cancel_clone = cancel.clone(); part_downloads.spawn( - async move { - debug!("starting index part download"); - - let index_part = client.download_index_file(&cancel_clone).await; - - debug!("finished index part download"); - - Result::<_, anyhow::Error>::Ok(TimelinePreload { - client, - timeline_id, - index_part, - }) - } - .map(move |res| { - res.with_context(|| format!("download index part for timeline {timeline_id}")) - }) - .instrument(info_span!("download_index_part", %timeline_id)), + self.load_timeline_metadata(timeline_id, remote_storage.clone(), cancel_clone) + .instrument(info_span!("download_index_part", %timeline_id)), ); } @@ -1348,8 +1321,7 @@ impl Tenant { next = part_downloads.join_next() => { match next { Some(result) => { - let preload_result = result.context("join preload task")?; - let preload = preload_result?; + let preload = result.context("join preload task")?; timeline_preloads.insert(preload.timeline_id, preload); }, None => { @@ -1366,6 +1338,36 @@ impl Tenant { Ok(timeline_preloads) } + fn load_timeline_metadata( + self: &Arc, + timeline_id: TimelineId, + remote_storage: GenericRemoteStorage, + cancel: CancellationToken, + ) -> impl Future { + let client = RemoteTimelineClient::new( + remote_storage.clone(), + self.deletion_queue_client.clone(), + self.conf, + self.tenant_shard_id, + timeline_id, + self.generation, + ); + async move { + debug_assert_current_span_has_tenant_and_timeline_id(); + debug!("starting index part download"); + + let index_part = client.download_index_file(&cancel).await; + + debug!("finished index part download"); + + TimelinePreload { + client, + timeline_id, + index_part, + } + } + } + pub(crate) async fn apply_timeline_archival_config( &self, timeline_id: TimelineId,