mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-27 01:50:38 +00:00
use RemoteTimelineClient for downloading index part during tenant_attach
Before this change, we would not .measure_remote_op for index part downloads. And more generally, it's good to pass not just uploads but also downloads through RemoteTimelineClient, e.g., if we ever want to implement some timeline-scoped policies there. Found this while working on https://github.com/neondatabase/neon/pull/3250 where I add a metric to measure the degree of concurrent downloads. Layer download was missing in a test that I added there.
This commit is contained in:
committed by
Christian Schwarz
parent
8c6e607327
commit
6a9d1030a6
@@ -13,11 +13,13 @@
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use bytes::Bytes;
|
||||
use futures::FutureExt;
|
||||
use futures::Stream;
|
||||
use pageserver_api::models::TimelineState;
|
||||
use remote_storage::DownloadError;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use tokio::sync::watch;
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::*;
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
|
||||
@@ -639,26 +641,62 @@ impl Tenant {
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("cannot attach without remote storage"))?;
|
||||
|
||||
let remote_timelines = remote_timeline_client::list_remote_timelines(
|
||||
let remote_timeline_ids = remote_timeline_client::list_remote_timelines(
|
||||
remote_storage,
|
||||
self.conf,
|
||||
self.tenant_id,
|
||||
)
|
||||
.await?;
|
||||
|
||||
info!("found {} timelines", remote_timelines.len());
|
||||
info!("found {} timelines", remote_timeline_ids.len());
|
||||
|
||||
let mut timeline_ancestors: HashMap<TimelineId, TimelineMetadata> = HashMap::new();
|
||||
let mut index_parts: HashMap<TimelineId, IndexPart> = HashMap::new();
|
||||
for (timeline_id, index_part) in remote_timelines {
|
||||
let remote_metadata = index_part.parse_metadata().with_context(|| {
|
||||
format!(
|
||||
"Failed to parse metadata file from remote storage for tenant {} timeline {}",
|
||||
self.tenant_id, timeline_id
|
||||
)
|
||||
})?;
|
||||
// Download & parse index parts
|
||||
let mut part_downloads = JoinSet::new();
|
||||
for timeline_id in remote_timeline_ids {
|
||||
let client = RemoteTimelineClient::new(
|
||||
remote_storage.clone(),
|
||||
self.conf,
|
||||
self.tenant_id,
|
||||
timeline_id,
|
||||
);
|
||||
part_downloads.spawn(
|
||||
async move {
|
||||
debug!("starting index part download");
|
||||
|
||||
let index_part = client
|
||||
.download_index_file()
|
||||
.await
|
||||
.context("download index file")?;
|
||||
|
||||
let remote_metadata = index_part.parse_metadata().context("parse metadata")?;
|
||||
|
||||
debug!("finished index part download");
|
||||
|
||||
Result::<_, anyhow::Error>::Ok((
|
||||
timeline_id,
|
||||
client,
|
||||
index_part,
|
||||
remote_metadata,
|
||||
))
|
||||
}
|
||||
.map(move |res| {
|
||||
res.with_context(|| format!("download index part for timeline {timeline_id}"))
|
||||
})
|
||||
.instrument(info_span!("download_index_part", timeline=%timeline_id)),
|
||||
);
|
||||
}
|
||||
// Wait for all the download tasks to complete & collect results.
|
||||
let mut remote_clients = HashMap::new();
|
||||
let mut index_parts = HashMap::new();
|
||||
let mut timeline_ancestors = HashMap::new();
|
||||
while let Some(result) = part_downloads.join_next().await {
|
||||
// NB: we already added timeline_id as context to the error
|
||||
let result: Result<_, anyhow::Error> = result.context("joinset task join")?;
|
||||
let (timeline_id, client, index_part, remote_metadata) = result?;
|
||||
debug!("successfully downloaded index part for timeline {timeline_id}");
|
||||
timeline_ancestors.insert(timeline_id, remote_metadata);
|
||||
index_parts.insert(timeline_id, index_part);
|
||||
remote_clients.insert(timeline_id, client);
|
||||
}
|
||||
|
||||
// For every timeline, download the metadata file, scan the local directory,
|
||||
@@ -671,7 +709,7 @@ impl Tenant {
|
||||
timeline_id,
|
||||
index_parts.remove(&timeline_id).unwrap(),
|
||||
remote_metadata,
|
||||
remote_storage.clone(),
|
||||
remote_clients.remove(&timeline_id).unwrap(),
|
||||
)
|
||||
.await
|
||||
.with_context(|| {
|
||||
@@ -714,22 +752,19 @@ impl Tenant {
|
||||
Ok(size)
|
||||
}
|
||||
|
||||
#[instrument(skip(self, index_part, remote_metadata, remote_storage), fields(timeline_id=%timeline_id))]
|
||||
#[instrument(skip_all, fields(timeline_id=%timeline_id))]
|
||||
async fn load_remote_timeline(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
index_part: IndexPart,
|
||||
remote_metadata: TimelineMetadata,
|
||||
remote_storage: GenericRemoteStorage,
|
||||
remote_client: RemoteTimelineClient,
|
||||
) -> anyhow::Result<()> {
|
||||
info!("downloading index file for timeline {}", timeline_id);
|
||||
tokio::fs::create_dir_all(self.conf.timeline_path(&timeline_id, &self.tenant_id))
|
||||
.await
|
||||
.context("Failed to create new timeline directory")?;
|
||||
|
||||
let remote_client =
|
||||
RemoteTimelineClient::new(remote_storage, self.conf, self.tenant_id, timeline_id)?;
|
||||
|
||||
let ancestor = if let Some(ancestor_id) = remote_metadata.ancestor_timeline() {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
Some(Arc::clone(timelines.get(&ancestor_id).ok_or_else(
|
||||
@@ -986,18 +1021,14 @@ impl Tenant {
|
||||
None
|
||||
};
|
||||
|
||||
let remote_client = self
|
||||
.remote_storage
|
||||
.as_ref()
|
||||
.map(|remote_storage| {
|
||||
RemoteTimelineClient::new(
|
||||
remote_storage.clone(),
|
||||
self.conf,
|
||||
self.tenant_id,
|
||||
timeline_id,
|
||||
)
|
||||
})
|
||||
.transpose()?;
|
||||
let remote_client = self.remote_storage.as_ref().map(|remote_storage| {
|
||||
RemoteTimelineClient::new(
|
||||
remote_storage.clone(),
|
||||
self.conf,
|
||||
self.tenant_id,
|
||||
timeline_id,
|
||||
)
|
||||
});
|
||||
|
||||
let remote_startup_data = match &remote_client {
|
||||
Some(remote_client) => match remote_client.download_index_file().await {
|
||||
@@ -2191,7 +2222,7 @@ impl Tenant {
|
||||
self.conf,
|
||||
tenant_id,
|
||||
new_timeline_id,
|
||||
)?;
|
||||
);
|
||||
remote_client.init_upload_queue_for_empty_remote(&new_metadata)?;
|
||||
Some(remote_client)
|
||||
} else {
|
||||
|
||||
@@ -298,8 +298,8 @@ impl RemoteTimelineClient {
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> anyhow::Result<RemoteTimelineClient> {
|
||||
Ok(RemoteTimelineClient {
|
||||
) -> RemoteTimelineClient {
|
||||
RemoteTimelineClient {
|
||||
conf,
|
||||
runtime: &BACKGROUND_RUNTIME,
|
||||
tenant_id,
|
||||
@@ -307,7 +307,7 @@ impl RemoteTimelineClient {
|
||||
storage_impl: remote_storage,
|
||||
upload_queue: Mutex::new(UploadQueue::Uninitialized),
|
||||
metrics: Arc::new(RemoteTimelineClientMetrics::new(&tenant_id, &timeline_id)),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Initialize the upload queue for a remote storage that already received
|
||||
|
||||
@@ -8,10 +8,9 @@ use std::future::Future;
|
||||
use std::path::Path;
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use tokio::fs;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tracing::{debug, error, info, info_span, warn, Instrument};
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::tenant::storage_layer::LayerFileName;
|
||||
@@ -175,7 +174,7 @@ pub async fn list_remote_timelines<'a>(
|
||||
storage: &'a GenericRemoteStorage,
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
) -> anyhow::Result<Vec<(TimelineId, IndexPart)>> {
|
||||
) -> anyhow::Result<HashSet<TimelineId>> {
|
||||
let tenant_path = conf.timelines_path(&tenant_id);
|
||||
let tenant_storage_path = conf.remote_path(&tenant_path)?;
|
||||
|
||||
@@ -194,7 +193,6 @@ pub async fn list_remote_timelines<'a>(
|
||||
}
|
||||
|
||||
let mut timeline_ids = HashSet::new();
|
||||
let mut part_downloads = FuturesUnordered::new();
|
||||
|
||||
for timeline_remote_storage_key in timelines {
|
||||
let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| {
|
||||
@@ -205,35 +203,22 @@ pub async fn list_remote_timelines<'a>(
|
||||
format!("failed to parse object name into timeline id '{object_name}'")
|
||||
})?;
|
||||
|
||||
// list_prefixes returns all files with the prefix. If we haven't seen this timeline ID
|
||||
// yet, launch a download task for it.
|
||||
if !timeline_ids.contains(&timeline_id) {
|
||||
timeline_ids.insert(timeline_id);
|
||||
let storage_clone = storage.clone();
|
||||
part_downloads.push(async move {
|
||||
(
|
||||
timeline_id,
|
||||
download_index_part(conf, &storage_clone, tenant_id, timeline_id)
|
||||
.instrument(info_span!("download_index_part", timeline=%timeline_id))
|
||||
.await,
|
||||
)
|
||||
});
|
||||
}
|
||||
// list_prefixes is assumed to return unique names. Ensure this here.
|
||||
// NB: it's safer to bail out than warn-log this because the pageserver
|
||||
// needs to absolutely know about _all_ timelines that exist, so that
|
||||
// GC knows all the branchpoints. If we skipped over a timeline instead,
|
||||
// GC could delete a layer that's still needed by that timeline.
|
||||
anyhow::ensure!(
|
||||
!timeline_ids.contains(&timeline_id),
|
||||
"list_prefixes contains duplicate timeline id {timeline_id}"
|
||||
);
|
||||
timeline_ids.insert(timeline_id);
|
||||
}
|
||||
|
||||
// Wait for all the download tasks to complete.
|
||||
let mut timeline_parts = Vec::new();
|
||||
while let Some((timeline_id, part_upload_result)) = part_downloads.next().await {
|
||||
let index_part = part_upload_result
|
||||
.with_context(|| format!("Failed to fetch index part for timeline {timeline_id}"))?;
|
||||
|
||||
debug!("Successfully fetched index part for timeline {timeline_id}");
|
||||
timeline_parts.push((timeline_id, index_part));
|
||||
}
|
||||
Ok(timeline_parts)
|
||||
Ok(timeline_ids)
|
||||
}
|
||||
|
||||
pub async fn download_index_part(
|
||||
pub(super) async fn download_index_part(
|
||||
conf: &'static PageServerConf,
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_id: TenantId,
|
||||
|
||||
Reference in New Issue
Block a user