mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
1 Commits
release-pr
...
jcsp/issue
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
51f87d34ca |
@@ -29,11 +29,11 @@ use std::cmp::min;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::BTreeSet;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt::Debug;
|
||||
use std::fmt::Display;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::io;
|
||||
use std::ops::Bound::Included;
|
||||
use std::process::Command;
|
||||
use std::process::Stdio;
|
||||
@@ -49,7 +49,6 @@ use self::config::AttachmentMode;
|
||||
use self::config::LocationConf;
|
||||
use self::config::TenantConf;
|
||||
use self::delete::DeleteTenantFlow;
|
||||
use self::metadata::LoadMetadataError;
|
||||
use self::metadata::TimelineMetadata;
|
||||
use self::mgr::TenantsMap;
|
||||
use self::remote_timeline_client::RemoteTimelineClient;
|
||||
@@ -373,6 +372,13 @@ struct RemoteStartupData {
|
||||
remote_metadata: TimelineMetadata,
|
||||
}
|
||||
|
||||
struct TimelinePreload {
|
||||
timeline_id: TimelineId,
|
||||
remote_client: Option<RemoteTimelineClient>,
|
||||
index_part: Option<IndexPart>,
|
||||
metadata: TimelineMetadata,
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum WaitToBecomeActiveError {
|
||||
WillNotBecomeActive {
|
||||
@@ -413,11 +419,6 @@ pub enum CreateTimelineError {
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
struct TenantDirectoryScan {
|
||||
sorted_timelines_to_load: Vec<(TimelineId, TimelineMetadata)>,
|
||||
timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)>,
|
||||
}
|
||||
|
||||
enum CreateTimelineCause {
|
||||
Load,
|
||||
Delete,
|
||||
@@ -661,41 +662,14 @@ impl Tenant {
|
||||
Ok(tenant)
|
||||
}
|
||||
|
||||
///
|
||||
/// Background task that downloads all data for a tenant and brings it to Active state.
|
||||
///
|
||||
/// No background tasks are started as part of this routine.
|
||||
///
|
||||
async fn attach(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let marker_file = self.conf.tenant_attaching_mark_file_path(&self.tenant_id);
|
||||
if !tokio::fs::try_exists(&marker_file)
|
||||
.await
|
||||
.context("check for existence of marker file")?
|
||||
{
|
||||
anyhow::bail!(
|
||||
"implementation error: marker file should exist at beginning of this function"
|
||||
);
|
||||
}
|
||||
|
||||
// Get list of remote timelines
|
||||
// download index files for every tenant timeline
|
||||
info!("listing remote timelines");
|
||||
|
||||
let remote_storage = self
|
||||
.remote_storage
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("cannot attach without remote storage"))?;
|
||||
|
||||
let remote_timeline_ids =
|
||||
remote_timeline_client::list_remote_timelines(remote_storage, self.tenant_id).await?;
|
||||
|
||||
info!("found {} timelines", remote_timeline_ids.len());
|
||||
|
||||
// Download & parse index parts
|
||||
fn download_indices(
|
||||
&self,
|
||||
timeline_ids: HashSet<TimelineId>,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
) -> JoinSet<Result<(TimelineId, RemoteTimelineClient, MaybeDeletedIndexPart), anyhow::Error>>
|
||||
{
|
||||
let mut part_downloads = JoinSet::new();
|
||||
for timeline_id in remote_timeline_ids {
|
||||
for timeline_id in timeline_ids {
|
||||
let client = RemoteTimelineClient::new(
|
||||
remote_storage.clone(),
|
||||
self.deletion_queue_client.clone(),
|
||||
@@ -724,11 +698,56 @@ impl Tenant {
|
||||
);
|
||||
}
|
||||
|
||||
part_downloads
|
||||
}
|
||||
|
||||
/// Special variant of preload_timelines that does not rely on remote storage
|
||||
async fn preload_timelines_local(
|
||||
self: &Arc<Self>,
|
||||
timeline_ids: &HashSet<TimelineId>,
|
||||
) -> anyhow::Result<Vec<TimelinePreload>> {
|
||||
let mut preload_map = HashMap::new();
|
||||
for timeline_id in timeline_ids {
|
||||
let metadata = load_metadata(self.conf, &self.tenant_id, timeline_id)?;
|
||||
preload_map.insert(
|
||||
*timeline_id,
|
||||
TimelinePreload {
|
||||
timeline_id: *timeline_id,
|
||||
remote_client: None,
|
||||
// TODO: synthesize an index_part and make it non-optional
|
||||
index_part: None,
|
||||
metadata,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
// Sort by ancestry
|
||||
Ok(
|
||||
tree_sort_timelines(preload_map, |p| p.metadata.ancestor_timeline())?
|
||||
.into_iter()
|
||||
.map(|i| i.1)
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Do the remote I/O and sorting required to prepare a list of timelines
|
||||
/// with their IndexParts, ready for hydrating into `Timeline`
|
||||
async fn preload_timelines(
|
||||
self: &Arc<Self>,
|
||||
timeline_ids: HashSet<TimelineId>,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
) -> anyhow::Result<Vec<TimelinePreload>> {
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let mut part_downloads = self.download_indices(timeline_ids, remote_storage);
|
||||
|
||||
let mut timelines_to_resume_deletions = vec![];
|
||||
|
||||
// We construct a map all timeline's preload state, prior to sorting
|
||||
// it by ancestry at the end of the function
|
||||
let mut preload_map: HashMap<TimelineId, TimelinePreload> = HashMap::new();
|
||||
|
||||
// Wait for all the download tasks to complete & collect results.
|
||||
let mut remote_index_and_client = HashMap::new();
|
||||
let mut timeline_ancestors = HashMap::new();
|
||||
while let Some(result) = part_downloads.join_next().await {
|
||||
// NB: we already added timeline_id as context to the error
|
||||
let result: Result<_, anyhow::Error> = result.context("joinset task join")?;
|
||||
@@ -736,8 +755,16 @@ impl Tenant {
|
||||
debug!("successfully downloaded index part for timeline {timeline_id}");
|
||||
match index_part {
|
||||
MaybeDeletedIndexPart::IndexPart(index_part) => {
|
||||
timeline_ancestors.insert(timeline_id, index_part.metadata.clone());
|
||||
remote_index_and_client.insert(timeline_id, (index_part, client));
|
||||
let metadata = index_part.metadata.clone();
|
||||
preload_map.insert(
|
||||
timeline_id,
|
||||
TimelinePreload {
|
||||
timeline_id,
|
||||
remote_client: Some(client),
|
||||
index_part: Some(index_part),
|
||||
metadata,
|
||||
},
|
||||
);
|
||||
}
|
||||
MaybeDeletedIndexPart::Deleted(index_part) => {
|
||||
info!(
|
||||
@@ -749,35 +776,6 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
// For every timeline, download the metadata file, scan the local directory,
|
||||
// and build a layer map that contains an entry for each remote and local
|
||||
// layer file.
|
||||
let sorted_timelines = tree_sort_timelines(timeline_ancestors, |m| m.ancestor_timeline())?;
|
||||
for (timeline_id, remote_metadata) in sorted_timelines {
|
||||
let (index_part, remote_client) = remote_index_and_client
|
||||
.remove(&timeline_id)
|
||||
.expect("just put it in above");
|
||||
|
||||
// TODO again handle early failure
|
||||
self.load_remote_timeline(
|
||||
timeline_id,
|
||||
index_part,
|
||||
remote_metadata,
|
||||
TimelineResources {
|
||||
remote_client: Some(remote_client),
|
||||
deletion_queue_client: self.deletion_queue_client.clone(),
|
||||
},
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"failed to load remote timeline {} for tenant {}",
|
||||
timeline_id, self.tenant_id
|
||||
)
|
||||
})?;
|
||||
}
|
||||
|
||||
// Walk through deleted timelines, resume deletion
|
||||
for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions {
|
||||
remote_timeline_client
|
||||
@@ -798,6 +796,81 @@ impl Tenant {
|
||||
.map_err(LoadLocalTimelineError::ResumeDeletion)?;
|
||||
}
|
||||
|
||||
// Sort by ancestry
|
||||
Ok(
|
||||
tree_sort_timelines(preload_map, |p| p.metadata.ancestor_timeline())?
|
||||
.into_iter()
|
||||
.map(|i| i.1)
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
|
||||
///
|
||||
/// Background task that downloads all data for a tenant and brings it to Active state.
|
||||
///
|
||||
/// No background tasks are started as part of this routine.
|
||||
///
|
||||
async fn attach(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let marker_file = self.conf.tenant_attaching_mark_file_path(&self.tenant_id);
|
||||
if !tokio::fs::try_exists(&marker_file)
|
||||
.await
|
||||
.context("check for existence of marker file")?
|
||||
{
|
||||
anyhow::bail!(
|
||||
"implementation error: marker file should exist at beginning of this function"
|
||||
);
|
||||
}
|
||||
|
||||
// Get list of remote timelines
|
||||
info!("listing remote timelines");
|
||||
|
||||
let remote_storage = self
|
||||
.remote_storage
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("cannot attach without remote storage"))?;
|
||||
|
||||
let remote_timeline_ids =
|
||||
remote_timeline_client::list_remote_timelines(remote_storage, self.tenant_id).await?;
|
||||
|
||||
info!("found {} timelines", remote_timeline_ids.len());
|
||||
|
||||
// Download & parse index parts
|
||||
let sorted_timelines = self
|
||||
.preload_timelines(remote_timeline_ids, remote_storage)
|
||||
.await?;
|
||||
|
||||
// For every timeline, download the metadata file, scan the local directory,
|
||||
// and build a layer map that contains an entry for each remote and local
|
||||
// layer file.
|
||||
for timeline_preload in sorted_timelines {
|
||||
let TimelinePreload {
|
||||
timeline_id,
|
||||
remote_client,
|
||||
index_part,
|
||||
metadata: _,
|
||||
} = timeline_preload;
|
||||
|
||||
// TODO again handle early failure
|
||||
self.load_remote_timeline(
|
||||
timeline_id,
|
||||
index_part.unwrap(),
|
||||
TimelineResources {
|
||||
remote_client,
|
||||
deletion_queue_client: self.deletion_queue_client.clone(),
|
||||
},
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"failed to load remote timeline {} for tenant {}",
|
||||
timeline_id, self.tenant_id
|
||||
)
|
||||
})?;
|
||||
}
|
||||
|
||||
std::fs::remove_file(&marker_file)
|
||||
.with_context(|| format!("unlink attach marker file {marker_file}"))?;
|
||||
crashsafe::fsync(marker_file.parent().expect("marker file has parent dir"))
|
||||
@@ -830,7 +903,6 @@ impl Tenant {
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
index_part: IndexPart,
|
||||
remote_metadata: TimelineMetadata,
|
||||
resources: TimelineResources,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
@@ -841,7 +913,7 @@ impl Tenant {
|
||||
.await
|
||||
.context("Failed to create new timeline directory")?;
|
||||
|
||||
let ancestor = if let Some(ancestor_id) = remote_metadata.ancestor_timeline() {
|
||||
let ancestor = if let Some(ancestor_id) = index_part.metadata.ancestor_timeline() {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
Some(Arc::clone(timelines.get(&ancestor_id).ok_or_else(
|
||||
|| {
|
||||
@@ -859,6 +931,7 @@ impl Tenant {
|
||||
// cannot be older than the local one
|
||||
let local_metadata = None;
|
||||
|
||||
let remote_metadata = index_part.metadata.clone();
|
||||
self.timeline_init_and_sync(
|
||||
timeline_id,
|
||||
resources,
|
||||
@@ -1032,12 +1105,9 @@ impl Tenant {
|
||||
tenant
|
||||
}
|
||||
|
||||
fn scan_and_sort_timelines_dir(self: Arc<Tenant>) -> anyhow::Result<TenantDirectoryScan> {
|
||||
let mut timelines_to_load: HashMap<TimelineId, TimelineMetadata> = HashMap::new();
|
||||
// Note timelines_to_resume_deletion needs to be separate because it can be not sortable
|
||||
// from the point of `tree_sort_timelines`. I e some parents can be missing because deletion
|
||||
// completed in non topological order (for example because parent has smaller number of layer files in it)
|
||||
let mut timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)> = vec![];
|
||||
async fn scan_timelines_dir(self: &Arc<Tenant>) -> anyhow::Result<HashSet<TimelineId>> {
|
||||
let mut timelines_to_load: HashSet<TimelineId> = HashSet::new();
|
||||
let mut timelines_to_resume_deletion: HashSet<TimelineId> = HashSet::new();
|
||||
|
||||
let timelines_dir = self.conf.timelines_path(&self.tenant_id);
|
||||
|
||||
@@ -1086,38 +1156,7 @@ impl Tenant {
|
||||
})?;
|
||||
|
||||
info!("Found deletion mark for timeline {}", timeline_id);
|
||||
|
||||
match load_metadata(self.conf, &self.tenant_id, &timeline_id) {
|
||||
Ok(metadata) => {
|
||||
timelines_to_resume_deletion.push((timeline_id, Some(metadata)))
|
||||
}
|
||||
Err(e) => match &e {
|
||||
LoadMetadataError::Read(r) => {
|
||||
if r.kind() != io::ErrorKind::NotFound {
|
||||
return Err(anyhow::anyhow!(e)).with_context(|| {
|
||||
format!("Failed to load metadata for timeline_id {timeline_id}")
|
||||
});
|
||||
}
|
||||
|
||||
// If metadata doesnt exist it means that we've crashed without
|
||||
// completing cleanup_remaining_timeline_fs_traces in DeleteTimelineFlow.
|
||||
// So save timeline_id for later call to `DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces`.
|
||||
// We cant do it here because the method is async so we'd need block_on
|
||||
// and here we're in spawn_blocking. cleanup_remaining_timeline_fs_traces uses fs operations
|
||||
// so that basically results in a cycle:
|
||||
// spawn_blocking
|
||||
// - block_on
|
||||
// - spawn_blocking
|
||||
// which can lead to running out of threads in blocing pool.
|
||||
timelines_to_resume_deletion.push((timeline_id, None));
|
||||
}
|
||||
_ => {
|
||||
return Err(anyhow::anyhow!(e)).with_context(|| {
|
||||
format!("Failed to load metadata for timeline_id {timeline_id}")
|
||||
})
|
||||
}
|
||||
},
|
||||
}
|
||||
timelines_to_resume_deletion.insert(timeline_id);
|
||||
} else {
|
||||
if !timeline_dir.exists() {
|
||||
warn!("Timeline dir entry become invalid: {timeline_dir}");
|
||||
@@ -1155,9 +1194,7 @@ impl Tenant {
|
||||
|
||||
let file_name = entry.file_name();
|
||||
if let Ok(timeline_id) = file_name.parse::<TimelineId>() {
|
||||
let metadata = load_metadata(self.conf, &self.tenant_id, &timeline_id)
|
||||
.context("failed to load metadata")?;
|
||||
timelines_to_load.insert(timeline_id, metadata);
|
||||
timelines_to_load.insert(timeline_id);
|
||||
} else {
|
||||
// A file or directory that doesn't look like a timeline ID
|
||||
warn!("unexpected file or directory in timelines directory: {file_name}");
|
||||
@@ -1165,14 +1202,18 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
// Sort the array of timeline IDs into tree-order, so that parent comes before
|
||||
// all its children.
|
||||
tree_sort_timelines(timelines_to_load, |m| m.ancestor_timeline()).map(|sorted_timelines| {
|
||||
TenantDirectoryScan {
|
||||
sorted_timelines_to_load: sorted_timelines,
|
||||
timelines_to_resume_deletion,
|
||||
for timeline_id in timelines_to_resume_deletion {
|
||||
if let Err(e) =
|
||||
DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces(self, timeline_id).await
|
||||
{
|
||||
warn!(
|
||||
"cannot clean up deleted timeline dir timeline_id: {} error: {:#}",
|
||||
timeline_id, e
|
||||
);
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
Ok(timelines_to_load)
|
||||
}
|
||||
|
||||
///
|
||||
@@ -1195,24 +1236,34 @@ impl Tenant {
|
||||
//
|
||||
// Scan the directory, peek into the metadata file of each timeline, and
|
||||
// collect a list of timelines and their ancestors.
|
||||
let span = info_span!("blocking");
|
||||
let cloned = Arc::clone(self);
|
||||
|
||||
let scan = tokio::task::spawn_blocking(move || {
|
||||
let _g = span.entered();
|
||||
cloned.scan_and_sort_timelines_dir()
|
||||
})
|
||||
.await
|
||||
.context("load spawn_blocking")
|
||||
.and_then(|res| res)?;
|
||||
let local_timelines = tokio::task::spawn(async move { cloned.scan_timelines_dir().await })
|
||||
.await
|
||||
.context("load spawn_blocking")
|
||||
.and_then(|res| res)?;
|
||||
|
||||
// FIXME original collect_timeline_files contained one more check:
|
||||
// 1. "Timeline has no ancestor and no layer files"
|
||||
let sorted_timelines = match &self.remote_storage {
|
||||
Some(remote_storage) => {
|
||||
self.preload_timelines(local_timelines, remote_storage)
|
||||
.await?
|
||||
}
|
||||
None => {
|
||||
// Deprecated mode, only used in dev.
|
||||
self.preload_timelines_local(&local_timelines).await?
|
||||
}
|
||||
};
|
||||
|
||||
for timeline_preload in sorted_timelines {
|
||||
let TimelinePreload {
|
||||
timeline_id,
|
||||
remote_client: _,
|
||||
index_part: _,
|
||||
metadata,
|
||||
} = timeline_preload;
|
||||
|
||||
// Process loadable timelines first
|
||||
for (timeline_id, local_metadata) in scan.sorted_timelines_to_load {
|
||||
if let Err(e) = self
|
||||
.load_local_timeline(timeline_id, local_metadata, init_order, ctx, false)
|
||||
.load_local_timeline(timeline_id, metadata, init_order, ctx, false)
|
||||
.await
|
||||
{
|
||||
match e {
|
||||
@@ -1229,43 +1280,6 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
// Resume deletion ones with deleted_mark
|
||||
for (timeline_id, maybe_local_metadata) in scan.timelines_to_resume_deletion {
|
||||
match maybe_local_metadata {
|
||||
None => {
|
||||
// See comment in `scan_and_sort_timelines_dir`.
|
||||
if let Err(e) =
|
||||
DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces(self, timeline_id)
|
||||
.await
|
||||
{
|
||||
warn!(
|
||||
"cannot clean up deleted timeline dir timeline_id: {} error: {:#}",
|
||||
timeline_id, e
|
||||
);
|
||||
}
|
||||
}
|
||||
Some(local_metadata) => {
|
||||
if let Err(e) = self
|
||||
.load_local_timeline(timeline_id, local_metadata, init_order, ctx, true)
|
||||
.await
|
||||
{
|
||||
match e {
|
||||
LoadLocalTimelineError::Load(source) => {
|
||||
// We tried to load deleted timeline, this is a bug.
|
||||
return Err(anyhow::anyhow!(source).context(
|
||||
"This is a bug. We tried to load deleted timeline which is wrong and loading failed. Timeline: {timeline_id}"
|
||||
));
|
||||
}
|
||||
LoadLocalTimelineError::ResumeDeletion(source) => {
|
||||
// Make sure resumed deletion wont fail loading for entire tenant.
|
||||
error!("Failed to resume timeline deletion: {source:#}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trace!("Done");
|
||||
|
||||
Ok(())
|
||||
|
||||
Reference in New Issue
Block a user