Compare commits

...

1 Commits

Author SHA1 Message Date
John Spray
51f87d34ca Unify remote index loading 2023-10-13 18:13:21 +01:00

View File

@@ -29,11 +29,11 @@ use std::cmp::min;
use std::collections::hash_map::Entry;
use std::collections::BTreeSet;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::Debug;
use std::fmt::Display;
use std::fs;
use std::fs::File;
use std::io;
use std::ops::Bound::Included;
use std::process::Command;
use std::process::Stdio;
@@ -49,7 +49,6 @@ use self::config::AttachmentMode;
use self::config::LocationConf;
use self::config::TenantConf;
use self::delete::DeleteTenantFlow;
use self::metadata::LoadMetadataError;
use self::metadata::TimelineMetadata;
use self::mgr::TenantsMap;
use self::remote_timeline_client::RemoteTimelineClient;
@@ -373,6 +372,13 @@ struct RemoteStartupData {
remote_metadata: TimelineMetadata,
}
struct TimelinePreload {
timeline_id: TimelineId,
remote_client: Option<RemoteTimelineClient>,
index_part: Option<IndexPart>,
metadata: TimelineMetadata,
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum WaitToBecomeActiveError {
WillNotBecomeActive {
@@ -413,11 +419,6 @@ pub enum CreateTimelineError {
Other(#[from] anyhow::Error),
}
struct TenantDirectoryScan {
sorted_timelines_to_load: Vec<(TimelineId, TimelineMetadata)>,
timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)>,
}
enum CreateTimelineCause {
Load,
Delete,
@@ -661,41 +662,14 @@ impl Tenant {
Ok(tenant)
}
///
/// Background task that downloads all data for a tenant and brings it to Active state.
///
/// No background tasks are started as part of this routine.
///
async fn attach(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
span::debug_assert_current_span_has_tenant_id();
let marker_file = self.conf.tenant_attaching_mark_file_path(&self.tenant_id);
if !tokio::fs::try_exists(&marker_file)
.await
.context("check for existence of marker file")?
{
anyhow::bail!(
"implementation error: marker file should exist at beginning of this function"
);
}
// Get list of remote timelines
// download index files for every tenant timeline
info!("listing remote timelines");
let remote_storage = self
.remote_storage
.as_ref()
.ok_or_else(|| anyhow::anyhow!("cannot attach without remote storage"))?;
let remote_timeline_ids =
remote_timeline_client::list_remote_timelines(remote_storage, self.tenant_id).await?;
info!("found {} timelines", remote_timeline_ids.len());
// Download & parse index parts
fn download_indices(
&self,
timeline_ids: HashSet<TimelineId>,
remote_storage: &GenericRemoteStorage,
) -> JoinSet<Result<(TimelineId, RemoteTimelineClient, MaybeDeletedIndexPart), anyhow::Error>>
{
let mut part_downloads = JoinSet::new();
for timeline_id in remote_timeline_ids {
for timeline_id in timeline_ids {
let client = RemoteTimelineClient::new(
remote_storage.clone(),
self.deletion_queue_client.clone(),
@@ -724,11 +698,56 @@ impl Tenant {
);
}
part_downloads
}
/// Special variant of preload_timelines that does not rely on remote storage
async fn preload_timelines_local(
self: &Arc<Self>,
timeline_ids: &HashSet<TimelineId>,
) -> anyhow::Result<Vec<TimelinePreload>> {
let mut preload_map = HashMap::new();
for timeline_id in timeline_ids {
let metadata = load_metadata(self.conf, &self.tenant_id, timeline_id)?;
preload_map.insert(
*timeline_id,
TimelinePreload {
timeline_id: *timeline_id,
remote_client: None,
// TODO: synthesize an index_part and make it non-optional
index_part: None,
metadata,
},
);
}
// Sort by ancestry
Ok(
tree_sort_timelines(preload_map, |p| p.metadata.ancestor_timeline())?
.into_iter()
.map(|i| i.1)
.collect(),
)
}
/// Do the remote I/O and sorting required to prepare a list of timelines
/// with their IndexParts, ready for hydrating into `Timeline`
async fn preload_timelines(
self: &Arc<Self>,
timeline_ids: HashSet<TimelineId>,
remote_storage: &GenericRemoteStorage,
) -> anyhow::Result<Vec<TimelinePreload>> {
span::debug_assert_current_span_has_tenant_id();
let mut part_downloads = self.download_indices(timeline_ids, remote_storage);
let mut timelines_to_resume_deletions = vec![];
// We construct a map all timeline's preload state, prior to sorting
// it by ancestry at the end of the function
let mut preload_map: HashMap<TimelineId, TimelinePreload> = HashMap::new();
// Wait for all the download tasks to complete & collect results.
let mut remote_index_and_client = HashMap::new();
let mut timeline_ancestors = HashMap::new();
while let Some(result) = part_downloads.join_next().await {
// NB: we already added timeline_id as context to the error
let result: Result<_, anyhow::Error> = result.context("joinset task join")?;
@@ -736,8 +755,16 @@ impl Tenant {
debug!("successfully downloaded index part for timeline {timeline_id}");
match index_part {
MaybeDeletedIndexPart::IndexPart(index_part) => {
timeline_ancestors.insert(timeline_id, index_part.metadata.clone());
remote_index_and_client.insert(timeline_id, (index_part, client));
let metadata = index_part.metadata.clone();
preload_map.insert(
timeline_id,
TimelinePreload {
timeline_id,
remote_client: Some(client),
index_part: Some(index_part),
metadata,
},
);
}
MaybeDeletedIndexPart::Deleted(index_part) => {
info!(
@@ -749,35 +776,6 @@ impl Tenant {
}
}
// For every timeline, download the metadata file, scan the local directory,
// and build a layer map that contains an entry for each remote and local
// layer file.
let sorted_timelines = tree_sort_timelines(timeline_ancestors, |m| m.ancestor_timeline())?;
for (timeline_id, remote_metadata) in sorted_timelines {
let (index_part, remote_client) = remote_index_and_client
.remove(&timeline_id)
.expect("just put it in above");
// TODO again handle early failure
self.load_remote_timeline(
timeline_id,
index_part,
remote_metadata,
TimelineResources {
remote_client: Some(remote_client),
deletion_queue_client: self.deletion_queue_client.clone(),
},
ctx,
)
.await
.with_context(|| {
format!(
"failed to load remote timeline {} for tenant {}",
timeline_id, self.tenant_id
)
})?;
}
// Walk through deleted timelines, resume deletion
for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions {
remote_timeline_client
@@ -798,6 +796,81 @@ impl Tenant {
.map_err(LoadLocalTimelineError::ResumeDeletion)?;
}
// Sort by ancestry
Ok(
tree_sort_timelines(preload_map, |p| p.metadata.ancestor_timeline())?
.into_iter()
.map(|i| i.1)
.collect(),
)
}
///
/// Background task that downloads all data for a tenant and brings it to Active state.
///
/// No background tasks are started as part of this routine.
///
async fn attach(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
span::debug_assert_current_span_has_tenant_id();
let marker_file = self.conf.tenant_attaching_mark_file_path(&self.tenant_id);
if !tokio::fs::try_exists(&marker_file)
.await
.context("check for existence of marker file")?
{
anyhow::bail!(
"implementation error: marker file should exist at beginning of this function"
);
}
// Get list of remote timelines
info!("listing remote timelines");
let remote_storage = self
.remote_storage
.as_ref()
.ok_or_else(|| anyhow::anyhow!("cannot attach without remote storage"))?;
let remote_timeline_ids =
remote_timeline_client::list_remote_timelines(remote_storage, self.tenant_id).await?;
info!("found {} timelines", remote_timeline_ids.len());
// Download & parse index parts
let sorted_timelines = self
.preload_timelines(remote_timeline_ids, remote_storage)
.await?;
// For every timeline, download the metadata file, scan the local directory,
// and build a layer map that contains an entry for each remote and local
// layer file.
for timeline_preload in sorted_timelines {
let TimelinePreload {
timeline_id,
remote_client,
index_part,
metadata: _,
} = timeline_preload;
// TODO again handle early failure
self.load_remote_timeline(
timeline_id,
index_part.unwrap(),
TimelineResources {
remote_client,
deletion_queue_client: self.deletion_queue_client.clone(),
},
ctx,
)
.await
.with_context(|| {
format!(
"failed to load remote timeline {} for tenant {}",
timeline_id, self.tenant_id
)
})?;
}
std::fs::remove_file(&marker_file)
.with_context(|| format!("unlink attach marker file {marker_file}"))?;
crashsafe::fsync(marker_file.parent().expect("marker file has parent dir"))
@@ -830,7 +903,6 @@ impl Tenant {
&self,
timeline_id: TimelineId,
index_part: IndexPart,
remote_metadata: TimelineMetadata,
resources: TimelineResources,
ctx: &RequestContext,
) -> anyhow::Result<()> {
@@ -841,7 +913,7 @@ impl Tenant {
.await
.context("Failed to create new timeline directory")?;
let ancestor = if let Some(ancestor_id) = remote_metadata.ancestor_timeline() {
let ancestor = if let Some(ancestor_id) = index_part.metadata.ancestor_timeline() {
let timelines = self.timelines.lock().unwrap();
Some(Arc::clone(timelines.get(&ancestor_id).ok_or_else(
|| {
@@ -859,6 +931,7 @@ impl Tenant {
// cannot be older than the local one
let local_metadata = None;
let remote_metadata = index_part.metadata.clone();
self.timeline_init_and_sync(
timeline_id,
resources,
@@ -1032,12 +1105,9 @@ impl Tenant {
tenant
}
fn scan_and_sort_timelines_dir(self: Arc<Tenant>) -> anyhow::Result<TenantDirectoryScan> {
let mut timelines_to_load: HashMap<TimelineId, TimelineMetadata> = HashMap::new();
// Note timelines_to_resume_deletion needs to be separate because it can be not sortable
// from the point of `tree_sort_timelines`. I e some parents can be missing because deletion
// completed in non topological order (for example because parent has smaller number of layer files in it)
let mut timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)> = vec![];
async fn scan_timelines_dir(self: &Arc<Tenant>) -> anyhow::Result<HashSet<TimelineId>> {
let mut timelines_to_load: HashSet<TimelineId> = HashSet::new();
let mut timelines_to_resume_deletion: HashSet<TimelineId> = HashSet::new();
let timelines_dir = self.conf.timelines_path(&self.tenant_id);
@@ -1086,38 +1156,7 @@ impl Tenant {
})?;
info!("Found deletion mark for timeline {}", timeline_id);
match load_metadata(self.conf, &self.tenant_id, &timeline_id) {
Ok(metadata) => {
timelines_to_resume_deletion.push((timeline_id, Some(metadata)))
}
Err(e) => match &e {
LoadMetadataError::Read(r) => {
if r.kind() != io::ErrorKind::NotFound {
return Err(anyhow::anyhow!(e)).with_context(|| {
format!("Failed to load metadata for timeline_id {timeline_id}")
});
}
// If metadata doesnt exist it means that we've crashed without
// completing cleanup_remaining_timeline_fs_traces in DeleteTimelineFlow.
// So save timeline_id for later call to `DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces`.
// We cant do it here because the method is async so we'd need block_on
// and here we're in spawn_blocking. cleanup_remaining_timeline_fs_traces uses fs operations
// so that basically results in a cycle:
// spawn_blocking
// - block_on
// - spawn_blocking
// which can lead to running out of threads in blocing pool.
timelines_to_resume_deletion.push((timeline_id, None));
}
_ => {
return Err(anyhow::anyhow!(e)).with_context(|| {
format!("Failed to load metadata for timeline_id {timeline_id}")
})
}
},
}
timelines_to_resume_deletion.insert(timeline_id);
} else {
if !timeline_dir.exists() {
warn!("Timeline dir entry become invalid: {timeline_dir}");
@@ -1155,9 +1194,7 @@ impl Tenant {
let file_name = entry.file_name();
if let Ok(timeline_id) = file_name.parse::<TimelineId>() {
let metadata = load_metadata(self.conf, &self.tenant_id, &timeline_id)
.context("failed to load metadata")?;
timelines_to_load.insert(timeline_id, metadata);
timelines_to_load.insert(timeline_id);
} else {
// A file or directory that doesn't look like a timeline ID
warn!("unexpected file or directory in timelines directory: {file_name}");
@@ -1165,14 +1202,18 @@ impl Tenant {
}
}
// Sort the array of timeline IDs into tree-order, so that parent comes before
// all its children.
tree_sort_timelines(timelines_to_load, |m| m.ancestor_timeline()).map(|sorted_timelines| {
TenantDirectoryScan {
sorted_timelines_to_load: sorted_timelines,
timelines_to_resume_deletion,
for timeline_id in timelines_to_resume_deletion {
if let Err(e) =
DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces(self, timeline_id).await
{
warn!(
"cannot clean up deleted timeline dir timeline_id: {} error: {:#}",
timeline_id, e
);
}
})
}
Ok(timelines_to_load)
}
///
@@ -1195,24 +1236,34 @@ impl Tenant {
//
// Scan the directory, peek into the metadata file of each timeline, and
// collect a list of timelines and their ancestors.
let span = info_span!("blocking");
let cloned = Arc::clone(self);
let scan = tokio::task::spawn_blocking(move || {
let _g = span.entered();
cloned.scan_and_sort_timelines_dir()
})
.await
.context("load spawn_blocking")
.and_then(|res| res)?;
let local_timelines = tokio::task::spawn(async move { cloned.scan_timelines_dir().await })
.await
.context("load spawn_blocking")
.and_then(|res| res)?;
// FIXME original collect_timeline_files contained one more check:
// 1. "Timeline has no ancestor and no layer files"
let sorted_timelines = match &self.remote_storage {
Some(remote_storage) => {
self.preload_timelines(local_timelines, remote_storage)
.await?
}
None => {
// Deprecated mode, only used in dev.
self.preload_timelines_local(&local_timelines).await?
}
};
for timeline_preload in sorted_timelines {
let TimelinePreload {
timeline_id,
remote_client: _,
index_part: _,
metadata,
} = timeline_preload;
// Process loadable timelines first
for (timeline_id, local_metadata) in scan.sorted_timelines_to_load {
if let Err(e) = self
.load_local_timeline(timeline_id, local_metadata, init_order, ctx, false)
.load_local_timeline(timeline_id, metadata, init_order, ctx, false)
.await
{
match e {
@@ -1229,43 +1280,6 @@ impl Tenant {
}
}
// Resume deletion ones with deleted_mark
for (timeline_id, maybe_local_metadata) in scan.timelines_to_resume_deletion {
match maybe_local_metadata {
None => {
// See comment in `scan_and_sort_timelines_dir`.
if let Err(e) =
DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces(self, timeline_id)
.await
{
warn!(
"cannot clean up deleted timeline dir timeline_id: {} error: {:#}",
timeline_id, e
);
}
}
Some(local_metadata) => {
if let Err(e) = self
.load_local_timeline(timeline_id, local_metadata, init_order, ctx, true)
.await
{
match e {
LoadLocalTimelineError::Load(source) => {
// We tried to load deleted timeline, this is a bug.
return Err(anyhow::anyhow!(source).context(
"This is a bug. We tried to load deleted timeline which is wrong and loading failed. Timeline: {timeline_id}"
));
}
LoadLocalTimelineError::ResumeDeletion(source) => {
// Make sure resumed deletion wont fail loading for entire tenant.
error!("Failed to resume timeline deletion: {source:#}")
}
}
}
}
}
}
trace!("Done");
Ok(())