diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index b79169fd8e..44c5cad6ba 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -29,11 +29,11 @@ use std::cmp::min; use std::collections::hash_map::Entry; use std::collections::BTreeSet; use std::collections::HashMap; +use std::collections::HashSet; use std::fmt::Debug; use std::fmt::Display; use std::fs; use std::fs::File; -use std::io; use std::ops::Bound::Included; use std::process::Command; use std::process::Stdio; @@ -49,7 +49,6 @@ use self::config::AttachmentMode; use self::config::LocationConf; use self::config::TenantConf; use self::delete::DeleteTenantFlow; -use self::metadata::LoadMetadataError; use self::metadata::TimelineMetadata; use self::mgr::TenantsMap; use self::remote_timeline_client::RemoteTimelineClient; @@ -373,6 +372,13 @@ struct RemoteStartupData { remote_metadata: TimelineMetadata, } +struct TimelinePreload { + timeline_id: TimelineId, + remote_client: Option, + index_part: Option, + metadata: TimelineMetadata, +} + #[derive(Debug, thiserror::Error)] pub(crate) enum WaitToBecomeActiveError { WillNotBecomeActive { @@ -413,11 +419,6 @@ pub enum CreateTimelineError { Other(#[from] anyhow::Error), } -struct TenantDirectoryScan { - sorted_timelines_to_load: Vec<(TimelineId, TimelineMetadata)>, - timelines_to_resume_deletion: Vec<(TimelineId, Option)>, -} - enum CreateTimelineCause { Load, Delete, @@ -661,41 +662,14 @@ impl Tenant { Ok(tenant) } - /// - /// Background task that downloads all data for a tenant and brings it to Active state. - /// - /// No background tasks are started as part of this routine. - /// - async fn attach(self: &Arc, ctx: &RequestContext) -> anyhow::Result<()> { - span::debug_assert_current_span_has_tenant_id(); - - let marker_file = self.conf.tenant_attaching_mark_file_path(&self.tenant_id); - if !tokio::fs::try_exists(&marker_file) - .await - .context("check for existence of marker file")? - { - anyhow::bail!( - "implementation error: marker file should exist at beginning of this function" - ); - } - - // Get list of remote timelines - // download index files for every tenant timeline - info!("listing remote timelines"); - - let remote_storage = self - .remote_storage - .as_ref() - .ok_or_else(|| anyhow::anyhow!("cannot attach without remote storage"))?; - - let remote_timeline_ids = - remote_timeline_client::list_remote_timelines(remote_storage, self.tenant_id).await?; - - info!("found {} timelines", remote_timeline_ids.len()); - - // Download & parse index parts + fn download_indices( + &self, + timeline_ids: HashSet, + remote_storage: &GenericRemoteStorage, + ) -> JoinSet> + { let mut part_downloads = JoinSet::new(); - for timeline_id in remote_timeline_ids { + for timeline_id in timeline_ids { let client = RemoteTimelineClient::new( remote_storage.clone(), self.deletion_queue_client.clone(), @@ -724,11 +698,56 @@ impl Tenant { ); } + part_downloads + } + + /// Special variant of preload_timelines that does not rely on remote storage + async fn preload_timelines_local( + self: &Arc, + timeline_ids: &HashSet, + ) -> anyhow::Result> { + let mut preload_map = HashMap::new(); + for timeline_id in timeline_ids { + let metadata = load_metadata(self.conf, &self.tenant_id, timeline_id)?; + preload_map.insert( + *timeline_id, + TimelinePreload { + timeline_id: *timeline_id, + remote_client: None, + // TODO: synthesize an index_part and make it non-optional + index_part: None, + metadata, + }, + ); + } + + // Sort by ancestry + Ok( + tree_sort_timelines(preload_map, |p| p.metadata.ancestor_timeline())? + .into_iter() + .map(|i| i.1) + .collect(), + ) + } + + /// Do the remote I/O and sorting required to prepare a list of timelines + /// with their IndexParts, ready for hydrating into `Timeline` + async fn preload_timelines( + self: &Arc, + timeline_ids: HashSet, + remote_storage: &GenericRemoteStorage, + ) -> anyhow::Result> { + span::debug_assert_current_span_has_tenant_id(); + + let mut part_downloads = self.download_indices(timeline_ids, remote_storage); + let mut timelines_to_resume_deletions = vec![]; + // We construct a map all timeline's preload state, prior to sorting + // it by ancestry at the end of the function + let mut preload_map: HashMap = HashMap::new(); + // Wait for all the download tasks to complete & collect results. - let mut remote_index_and_client = HashMap::new(); - let mut timeline_ancestors = HashMap::new(); while let Some(result) = part_downloads.join_next().await { // NB: we already added timeline_id as context to the error let result: Result<_, anyhow::Error> = result.context("joinset task join")?; @@ -736,8 +755,16 @@ impl Tenant { debug!("successfully downloaded index part for timeline {timeline_id}"); match index_part { MaybeDeletedIndexPart::IndexPart(index_part) => { - timeline_ancestors.insert(timeline_id, index_part.metadata.clone()); - remote_index_and_client.insert(timeline_id, (index_part, client)); + let metadata = index_part.metadata.clone(); + preload_map.insert( + timeline_id, + TimelinePreload { + timeline_id, + remote_client: Some(client), + index_part: Some(index_part), + metadata, + }, + ); } MaybeDeletedIndexPart::Deleted(index_part) => { info!( @@ -749,35 +776,6 @@ impl Tenant { } } - // For every timeline, download the metadata file, scan the local directory, - // and build a layer map that contains an entry for each remote and local - // layer file. - let sorted_timelines = tree_sort_timelines(timeline_ancestors, |m| m.ancestor_timeline())?; - for (timeline_id, remote_metadata) in sorted_timelines { - let (index_part, remote_client) = remote_index_and_client - .remove(&timeline_id) - .expect("just put it in above"); - - // TODO again handle early failure - self.load_remote_timeline( - timeline_id, - index_part, - remote_metadata, - TimelineResources { - remote_client: Some(remote_client), - deletion_queue_client: self.deletion_queue_client.clone(), - }, - ctx, - ) - .await - .with_context(|| { - format!( - "failed to load remote timeline {} for tenant {}", - timeline_id, self.tenant_id - ) - })?; - } - // Walk through deleted timelines, resume deletion for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions { remote_timeline_client @@ -798,6 +796,81 @@ impl Tenant { .map_err(LoadLocalTimelineError::ResumeDeletion)?; } + // Sort by ancestry + Ok( + tree_sort_timelines(preload_map, |p| p.metadata.ancestor_timeline())? + .into_iter() + .map(|i| i.1) + .collect(), + ) + } + + /// + /// Background task that downloads all data for a tenant and brings it to Active state. + /// + /// No background tasks are started as part of this routine. + /// + async fn attach(self: &Arc, ctx: &RequestContext) -> anyhow::Result<()> { + span::debug_assert_current_span_has_tenant_id(); + + let marker_file = self.conf.tenant_attaching_mark_file_path(&self.tenant_id); + if !tokio::fs::try_exists(&marker_file) + .await + .context("check for existence of marker file")? + { + anyhow::bail!( + "implementation error: marker file should exist at beginning of this function" + ); + } + + // Get list of remote timelines + info!("listing remote timelines"); + + let remote_storage = self + .remote_storage + .as_ref() + .ok_or_else(|| anyhow::anyhow!("cannot attach without remote storage"))?; + + let remote_timeline_ids = + remote_timeline_client::list_remote_timelines(remote_storage, self.tenant_id).await?; + + info!("found {} timelines", remote_timeline_ids.len()); + + // Download & parse index parts + let sorted_timelines = self + .preload_timelines(remote_timeline_ids, remote_storage) + .await?; + + // For every timeline, download the metadata file, scan the local directory, + // and build a layer map that contains an entry for each remote and local + // layer file. + for timeline_preload in sorted_timelines { + let TimelinePreload { + timeline_id, + remote_client, + index_part, + metadata: _, + } = timeline_preload; + + // TODO again handle early failure + self.load_remote_timeline( + timeline_id, + index_part.unwrap(), + TimelineResources { + remote_client, + deletion_queue_client: self.deletion_queue_client.clone(), + }, + ctx, + ) + .await + .with_context(|| { + format!( + "failed to load remote timeline {} for tenant {}", + timeline_id, self.tenant_id + ) + })?; + } + std::fs::remove_file(&marker_file) .with_context(|| format!("unlink attach marker file {marker_file}"))?; crashsafe::fsync(marker_file.parent().expect("marker file has parent dir")) @@ -830,7 +903,6 @@ impl Tenant { &self, timeline_id: TimelineId, index_part: IndexPart, - remote_metadata: TimelineMetadata, resources: TimelineResources, ctx: &RequestContext, ) -> anyhow::Result<()> { @@ -841,7 +913,7 @@ impl Tenant { .await .context("Failed to create new timeline directory")?; - let ancestor = if let Some(ancestor_id) = remote_metadata.ancestor_timeline() { + let ancestor = if let Some(ancestor_id) = index_part.metadata.ancestor_timeline() { let timelines = self.timelines.lock().unwrap(); Some(Arc::clone(timelines.get(&ancestor_id).ok_or_else( || { @@ -859,6 +931,7 @@ impl Tenant { // cannot be older than the local one let local_metadata = None; + let remote_metadata = index_part.metadata.clone(); self.timeline_init_and_sync( timeline_id, resources, @@ -1032,12 +1105,9 @@ impl Tenant { tenant } - fn scan_and_sort_timelines_dir(self: Arc) -> anyhow::Result { - let mut timelines_to_load: HashMap = HashMap::new(); - // Note timelines_to_resume_deletion needs to be separate because it can be not sortable - // from the point of `tree_sort_timelines`. I e some parents can be missing because deletion - // completed in non topological order (for example because parent has smaller number of layer files in it) - let mut timelines_to_resume_deletion: Vec<(TimelineId, Option)> = vec![]; + async fn scan_timelines_dir(self: &Arc) -> anyhow::Result> { + let mut timelines_to_load: HashSet = HashSet::new(); + let mut timelines_to_resume_deletion: HashSet = HashSet::new(); let timelines_dir = self.conf.timelines_path(&self.tenant_id); @@ -1086,38 +1156,7 @@ impl Tenant { })?; info!("Found deletion mark for timeline {}", timeline_id); - - match load_metadata(self.conf, &self.tenant_id, &timeline_id) { - Ok(metadata) => { - timelines_to_resume_deletion.push((timeline_id, Some(metadata))) - } - Err(e) => match &e { - LoadMetadataError::Read(r) => { - if r.kind() != io::ErrorKind::NotFound { - return Err(anyhow::anyhow!(e)).with_context(|| { - format!("Failed to load metadata for timeline_id {timeline_id}") - }); - } - - // If metadata doesnt exist it means that we've crashed without - // completing cleanup_remaining_timeline_fs_traces in DeleteTimelineFlow. - // So save timeline_id for later call to `DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces`. - // We cant do it here because the method is async so we'd need block_on - // and here we're in spawn_blocking. cleanup_remaining_timeline_fs_traces uses fs operations - // so that basically results in a cycle: - // spawn_blocking - // - block_on - // - spawn_blocking - // which can lead to running out of threads in blocing pool. - timelines_to_resume_deletion.push((timeline_id, None)); - } - _ => { - return Err(anyhow::anyhow!(e)).with_context(|| { - format!("Failed to load metadata for timeline_id {timeline_id}") - }) - } - }, - } + timelines_to_resume_deletion.insert(timeline_id); } else { if !timeline_dir.exists() { warn!("Timeline dir entry become invalid: {timeline_dir}"); @@ -1155,9 +1194,7 @@ impl Tenant { let file_name = entry.file_name(); if let Ok(timeline_id) = file_name.parse::() { - let metadata = load_metadata(self.conf, &self.tenant_id, &timeline_id) - .context("failed to load metadata")?; - timelines_to_load.insert(timeline_id, metadata); + timelines_to_load.insert(timeline_id); } else { // A file or directory that doesn't look like a timeline ID warn!("unexpected file or directory in timelines directory: {file_name}"); @@ -1165,14 +1202,18 @@ impl Tenant { } } - // Sort the array of timeline IDs into tree-order, so that parent comes before - // all its children. - tree_sort_timelines(timelines_to_load, |m| m.ancestor_timeline()).map(|sorted_timelines| { - TenantDirectoryScan { - sorted_timelines_to_load: sorted_timelines, - timelines_to_resume_deletion, + for timeline_id in timelines_to_resume_deletion { + if let Err(e) = + DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces(self, timeline_id).await + { + warn!( + "cannot clean up deleted timeline dir timeline_id: {} error: {:#}", + timeline_id, e + ); } - }) + } + + Ok(timelines_to_load) } /// @@ -1195,24 +1236,34 @@ impl Tenant { // // Scan the directory, peek into the metadata file of each timeline, and // collect a list of timelines and their ancestors. - let span = info_span!("blocking"); let cloned = Arc::clone(self); - let scan = tokio::task::spawn_blocking(move || { - let _g = span.entered(); - cloned.scan_and_sort_timelines_dir() - }) - .await - .context("load spawn_blocking") - .and_then(|res| res)?; + let local_timelines = tokio::task::spawn(async move { cloned.scan_timelines_dir().await }) + .await + .context("load spawn_blocking") + .and_then(|res| res)?; - // FIXME original collect_timeline_files contained one more check: - // 1. "Timeline has no ancestor and no layer files" + let sorted_timelines = match &self.remote_storage { + Some(remote_storage) => { + self.preload_timelines(local_timelines, remote_storage) + .await? + } + None => { + // Deprecated mode, only used in dev. + self.preload_timelines_local(&local_timelines).await? + } + }; + + for timeline_preload in sorted_timelines { + let TimelinePreload { + timeline_id, + remote_client: _, + index_part: _, + metadata, + } = timeline_preload; - // Process loadable timelines first - for (timeline_id, local_metadata) in scan.sorted_timelines_to_load { if let Err(e) = self - .load_local_timeline(timeline_id, local_metadata, init_order, ctx, false) + .load_local_timeline(timeline_id, metadata, init_order, ctx, false) .await { match e { @@ -1229,43 +1280,6 @@ impl Tenant { } } - // Resume deletion ones with deleted_mark - for (timeline_id, maybe_local_metadata) in scan.timelines_to_resume_deletion { - match maybe_local_metadata { - None => { - // See comment in `scan_and_sort_timelines_dir`. - if let Err(e) = - DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces(self, timeline_id) - .await - { - warn!( - "cannot clean up deleted timeline dir timeline_id: {} error: {:#}", - timeline_id, e - ); - } - } - Some(local_metadata) => { - if let Err(e) = self - .load_local_timeline(timeline_id, local_metadata, init_order, ctx, true) - .await - { - match e { - LoadLocalTimelineError::Load(source) => { - // We tried to load deleted timeline, this is a bug. - return Err(anyhow::anyhow!(source).context( - "This is a bug. We tried to load deleted timeline which is wrong and loading failed. Timeline: {timeline_id}" - )); - } - LoadLocalTimelineError::ResumeDeletion(source) => { - // Make sure resumed deletion wont fail loading for entire tenant. - error!("Failed to resume timeline deletion: {source:#}") - } - } - } - } - } - } - trace!("Done"); Ok(())