From 6a9d1030a687d6c4ebd415f702441f09d679fab4 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 3 Jan 2023 17:43:54 +0100 Subject: [PATCH] use RemoteTimelineClient for downloading index part during tenant_attach Before this change, we would not .measure_remote_op for index part downloads. And more generally, it's good to pass not just uploads but also downloads through RemoteTimelineClient, e.g., if we ever want to implement some timeline-scoped policies there. Found this while working on https://github.com/neondatabase/neon/pull/3250 where I add a metric to measure the degree of concurrent downloads. Layer download was missing in a test that I added there. --- pageserver/src/tenant.rs | 91 +++++++++++++------ .../src/tenant/remote_timeline_client.rs | 6 +- .../tenant/remote_timeline_client/download.rs | 43 +++------ 3 files changed, 78 insertions(+), 62 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index dcaa8ea268..72404e98cd 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -13,11 +13,13 @@ use anyhow::{bail, Context}; use bytes::Bytes; +use futures::FutureExt; use futures::Stream; use pageserver_api::models::TimelineState; use remote_storage::DownloadError; use remote_storage::GenericRemoteStorage; use tokio::sync::watch; +use tokio::task::JoinSet; use tracing::*; use utils::crashsafe::path_with_suffix_extension; @@ -639,26 +641,62 @@ impl Tenant { .as_ref() .ok_or_else(|| anyhow::anyhow!("cannot attach without remote storage"))?; - let remote_timelines = remote_timeline_client::list_remote_timelines( + let remote_timeline_ids = remote_timeline_client::list_remote_timelines( remote_storage, self.conf, self.tenant_id, ) .await?; - info!("found {} timelines", remote_timelines.len()); + info!("found {} timelines", remote_timeline_ids.len()); - let mut timeline_ancestors: HashMap = HashMap::new(); - let mut index_parts: HashMap = HashMap::new(); - for (timeline_id, index_part) in remote_timelines { - let remote_metadata = index_part.parse_metadata().with_context(|| { - format!( - "Failed to parse metadata file from remote storage for tenant {} timeline {}", - self.tenant_id, timeline_id - ) - })?; + // Download & parse index parts + let mut part_downloads = JoinSet::new(); + for timeline_id in remote_timeline_ids { + let client = RemoteTimelineClient::new( + remote_storage.clone(), + self.conf, + self.tenant_id, + timeline_id, + ); + part_downloads.spawn( + async move { + debug!("starting index part download"); + + let index_part = client + .download_index_file() + .await + .context("download index file")?; + + let remote_metadata = index_part.parse_metadata().context("parse metadata")?; + + debug!("finished index part download"); + + Result::<_, anyhow::Error>::Ok(( + timeline_id, + client, + index_part, + remote_metadata, + )) + } + .map(move |res| { + res.with_context(|| format!("download index part for timeline {timeline_id}")) + }) + .instrument(info_span!("download_index_part", timeline=%timeline_id)), + ); + } + // Wait for all the download tasks to complete & collect results. + let mut remote_clients = HashMap::new(); + let mut index_parts = HashMap::new(); + let mut timeline_ancestors = HashMap::new(); + while let Some(result) = part_downloads.join_next().await { + // NB: we already added timeline_id as context to the error + let result: Result<_, anyhow::Error> = result.context("joinset task join")?; + let (timeline_id, client, index_part, remote_metadata) = result?; + debug!("successfully downloaded index part for timeline {timeline_id}"); timeline_ancestors.insert(timeline_id, remote_metadata); index_parts.insert(timeline_id, index_part); + remote_clients.insert(timeline_id, client); } // For every timeline, download the metadata file, scan the local directory, @@ -671,7 +709,7 @@ impl Tenant { timeline_id, index_parts.remove(&timeline_id).unwrap(), remote_metadata, - remote_storage.clone(), + remote_clients.remove(&timeline_id).unwrap(), ) .await .with_context(|| { @@ -714,22 +752,19 @@ impl Tenant { Ok(size) } - #[instrument(skip(self, index_part, remote_metadata, remote_storage), fields(timeline_id=%timeline_id))] + #[instrument(skip_all, fields(timeline_id=%timeline_id))] async fn load_remote_timeline( &self, timeline_id: TimelineId, index_part: IndexPart, remote_metadata: TimelineMetadata, - remote_storage: GenericRemoteStorage, + remote_client: RemoteTimelineClient, ) -> anyhow::Result<()> { info!("downloading index file for timeline {}", timeline_id); tokio::fs::create_dir_all(self.conf.timeline_path(&timeline_id, &self.tenant_id)) .await .context("Failed to create new timeline directory")?; - let remote_client = - RemoteTimelineClient::new(remote_storage, self.conf, self.tenant_id, timeline_id)?; - let ancestor = if let Some(ancestor_id) = remote_metadata.ancestor_timeline() { let timelines = self.timelines.lock().unwrap(); Some(Arc::clone(timelines.get(&ancestor_id).ok_or_else( @@ -986,18 +1021,14 @@ impl Tenant { None }; - let remote_client = self - .remote_storage - .as_ref() - .map(|remote_storage| { - RemoteTimelineClient::new( - remote_storage.clone(), - self.conf, - self.tenant_id, - timeline_id, - ) - }) - .transpose()?; + let remote_client = self.remote_storage.as_ref().map(|remote_storage| { + RemoteTimelineClient::new( + remote_storage.clone(), + self.conf, + self.tenant_id, + timeline_id, + ) + }); let remote_startup_data = match &remote_client { Some(remote_client) => match remote_client.download_index_file().await { @@ -2191,7 +2222,7 @@ impl Tenant { self.conf, tenant_id, new_timeline_id, - )?; + ); remote_client.init_upload_queue_for_empty_remote(&new_metadata)?; Some(remote_client) } else { diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 45988ff47a..a9f19a4e1d 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -298,8 +298,8 @@ impl RemoteTimelineClient { conf: &'static PageServerConf, tenant_id: TenantId, timeline_id: TimelineId, - ) -> anyhow::Result { - Ok(RemoteTimelineClient { + ) -> RemoteTimelineClient { + RemoteTimelineClient { conf, runtime: &BACKGROUND_RUNTIME, tenant_id, @@ -307,7 +307,7 @@ impl RemoteTimelineClient { storage_impl: remote_storage, upload_queue: Mutex::new(UploadQueue::Uninitialized), metrics: Arc::new(RemoteTimelineClientMetrics::new(&tenant_id, &timeline_id)), - }) + } } /// Initialize the upload queue for a remote storage that already received diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 422728d1f3..2e79698087 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -8,10 +8,9 @@ use std::future::Future; use std::path::Path; use anyhow::{anyhow, Context}; -use futures::stream::{FuturesUnordered, StreamExt}; use tokio::fs; use tokio::io::AsyncWriteExt; -use tracing::{debug, error, info, info_span, warn, Instrument}; +use tracing::{error, info, warn}; use crate::config::PageServerConf; use crate::tenant::storage_layer::LayerFileName; @@ -175,7 +174,7 @@ pub async fn list_remote_timelines<'a>( storage: &'a GenericRemoteStorage, conf: &'static PageServerConf, tenant_id: TenantId, -) -> anyhow::Result> { +) -> anyhow::Result> { let tenant_path = conf.timelines_path(&tenant_id); let tenant_storage_path = conf.remote_path(&tenant_path)?; @@ -194,7 +193,6 @@ pub async fn list_remote_timelines<'a>( } let mut timeline_ids = HashSet::new(); - let mut part_downloads = FuturesUnordered::new(); for timeline_remote_storage_key in timelines { let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| { @@ -205,35 +203,22 @@ pub async fn list_remote_timelines<'a>( format!("failed to parse object name into timeline id '{object_name}'") })?; - // list_prefixes returns all files with the prefix. If we haven't seen this timeline ID - // yet, launch a download task for it. - if !timeline_ids.contains(&timeline_id) { - timeline_ids.insert(timeline_id); - let storage_clone = storage.clone(); - part_downloads.push(async move { - ( - timeline_id, - download_index_part(conf, &storage_clone, tenant_id, timeline_id) - .instrument(info_span!("download_index_part", timeline=%timeline_id)) - .await, - ) - }); - } + // list_prefixes is assumed to return unique names. Ensure this here. + // NB: it's safer to bail out than warn-log this because the pageserver + // needs to absolutely know about _all_ timelines that exist, so that + // GC knows all the branchpoints. If we skipped over a timeline instead, + // GC could delete a layer that's still needed by that timeline. + anyhow::ensure!( + !timeline_ids.contains(&timeline_id), + "list_prefixes contains duplicate timeline id {timeline_id}" + ); + timeline_ids.insert(timeline_id); } - // Wait for all the download tasks to complete. - let mut timeline_parts = Vec::new(); - while let Some((timeline_id, part_upload_result)) = part_downloads.next().await { - let index_part = part_upload_result - .with_context(|| format!("Failed to fetch index part for timeline {timeline_id}"))?; - - debug!("Successfully fetched index part for timeline {timeline_id}"); - timeline_parts.push((timeline_id, index_part)); - } - Ok(timeline_parts) + Ok(timeline_ids) } -pub async fn download_index_part( +pub(super) async fn download_index_part( conf: &'static PageServerConf, storage: &GenericRemoteStorage, tenant_id: TenantId,