mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 18:02:56 +00:00
dedup cleanup fs traces (#4778)
This is a follow up for discussion: https://github.com/neondatabase/neon/pull/4552#discussion_r1253417777 see context there
This commit is contained in:
@@ -23,7 +23,6 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::completion;
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::fs_ext;
|
||||
|
||||
use std::cmp::min;
|
||||
use std::collections::hash_map::Entry;
|
||||
@@ -32,6 +31,7 @@ use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::fs::OpenOptions;
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
use std::ops::Bound::Included;
|
||||
use std::path::Path;
|
||||
@@ -47,6 +47,7 @@ use std::time::{Duration, Instant};
|
||||
|
||||
use self::config::TenantConf;
|
||||
use self::delete::DeleteTimelineFlow;
|
||||
use self::metadata::LoadMetadataError;
|
||||
use self::metadata::TimelineMetadata;
|
||||
use self::remote_timeline_client::RemoteTimelineClient;
|
||||
use self::timeline::uninit::TimelineUninitMark;
|
||||
@@ -338,7 +339,7 @@ pub enum CreateTimelineError {
|
||||
|
||||
struct TenantDirectoryScan {
|
||||
sorted_timelines_to_load: Vec<(TimelineId, TimelineMetadata)>,
|
||||
timelines_to_resume_deletion: Vec<(TimelineId, TimelineMetadata)>,
|
||||
timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)>,
|
||||
}
|
||||
|
||||
enum CreateTimelineCause {
|
||||
@@ -818,7 +819,8 @@ impl Tenant {
|
||||
// Note timelines_to_resume_deletion needs to be separate because it can be not sortable
|
||||
// from the point of `tree_sort_timelines`. I e some parents can be missing because deletion
|
||||
// completed in non topological order (for example because parent has smaller number of layer files in it)
|
||||
let mut timelines_to_resume_deletion: Vec<(TimelineId, TimelineMetadata)> = vec![];
|
||||
let mut timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)> = vec![];
|
||||
|
||||
let timelines_dir = self.conf.timelines_path(&self.tenant_id);
|
||||
|
||||
for entry in
|
||||
@@ -868,7 +870,6 @@ impl Tenant {
|
||||
}
|
||||
} else if crate::is_delete_mark(&timeline_dir) {
|
||||
// If metadata exists, load as usual, continue deletion
|
||||
// If metadata doesnt exist remove timeline dir and delete mark
|
||||
let timeline_id =
|
||||
TimelineId::try_from(timeline_dir.file_stem()).with_context(|| {
|
||||
format!(
|
||||
@@ -877,33 +878,37 @@ impl Tenant {
|
||||
)
|
||||
})?;
|
||||
|
||||
let metadata_path = self.conf.metadata_path(&self.tenant_id, &timeline_id);
|
||||
if metadata_path.exists() {
|
||||
// Remote deletion did not finish. Need to resume.
|
||||
timelines_to_resume_deletion.push((
|
||||
timeline_id,
|
||||
load_metadata(self.conf, &self.tenant_id, &timeline_id)?,
|
||||
));
|
||||
continue;
|
||||
}
|
||||
match load_metadata(self.conf, &self.tenant_id, &timeline_id) {
|
||||
Ok(metadata) => {
|
||||
timelines_to_resume_deletion.push((timeline_id, Some(metadata)))
|
||||
}
|
||||
Err(e) => match &e {
|
||||
LoadMetadataError::Read(r) => {
|
||||
if r.kind() != io::ErrorKind::NotFound {
|
||||
return Err(anyhow::anyhow!(e)).with_context(|| {
|
||||
format!("Failed to load metadata for timeline_id {timeline_id}")
|
||||
});
|
||||
}
|
||||
|
||||
// Missing metadata means that timeline directory should be empty at this point.
|
||||
// Remove delete mark afterwards.
|
||||
// Note that failure during the process wont prevent tenant from successfully loading.
|
||||
// TODO: this is very much similar to DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces
|
||||
// but here we're inside spawn_blocking.
|
||||
if let Err(e) = fs_ext::ignore_absent_files(|| {
|
||||
fs::remove_dir(self.conf.timeline_path(&self.tenant_id, &timeline_id))
|
||||
})
|
||||
.context("remove deleted timeline dir")
|
||||
.and_then(|_| fs::remove_file(&timeline_dir).context("remove delete mark"))
|
||||
{
|
||||
warn!(
|
||||
"cannot clean up deleted timeline dir at: {} error: {:#}",
|
||||
timeline_dir.display(),
|
||||
e
|
||||
);
|
||||
};
|
||||
// If metadata doesnt exist it means that we've crashed without
|
||||
// completing cleanup_remaining_timeline_fs_traces in DeleteTimelineFlow.
|
||||
// So save timeline_id for later call to `DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces`.
|
||||
// We cant do it here because the method is async so we'd need block_on
|
||||
// and here we're in spawn_blocking. cleanup_remaining_timeline_fs_traces uses fs operations
|
||||
// so that basically results in a cycle:
|
||||
// spawn_blocking
|
||||
// - block_on
|
||||
// - spawn_blocking
|
||||
// which can lead to running out of threads in blocing pool.
|
||||
timelines_to_resume_deletion.push((timeline_id, None));
|
||||
}
|
||||
_ => {
|
||||
return Err(anyhow::anyhow!(e)).with_context(|| {
|
||||
format!("Failed to load metadata for timeline_id {timeline_id}")
|
||||
})
|
||||
}
|
||||
},
|
||||
}
|
||||
} else {
|
||||
if !timeline_dir.exists() {
|
||||
warn!(
|
||||
@@ -1022,21 +1027,37 @@ impl Tenant {
|
||||
}
|
||||
|
||||
// Resume deletion ones with deleted_mark
|
||||
for (timeline_id, local_metadata) in scan.timelines_to_resume_deletion {
|
||||
if let Err(e) = self
|
||||
.load_local_timeline(timeline_id, local_metadata, init_order, ctx, true)
|
||||
.await
|
||||
{
|
||||
match e {
|
||||
LoadLocalTimelineError::Load(source) => {
|
||||
// We tried to load deleted timeline, this is a bug.
|
||||
return Err(anyhow::anyhow!(source).context(
|
||||
"This is a bug. We tried to load deleted timeline which is wrong and loading failed. Timeline: {timeline_id}"
|
||||
));
|
||||
for (timeline_id, maybe_local_metadata) in scan.timelines_to_resume_deletion {
|
||||
match maybe_local_metadata {
|
||||
None => {
|
||||
// See comment in `scan_and_sort_timelines_dir`.
|
||||
if let Err(e) =
|
||||
DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces(self, timeline_id)
|
||||
.await
|
||||
{
|
||||
warn!(
|
||||
"cannot clean up deleted timeline dir timeline_id: {} error: {:#}",
|
||||
timeline_id, e
|
||||
);
|
||||
}
|
||||
LoadLocalTimelineError::ResumeDeletion(source) => {
|
||||
// Make sure resumed deletion wont fail loading for entire tenant.
|
||||
error!("Failed to resume timeline deletion: {source:#}")
|
||||
}
|
||||
Some(local_metadata) => {
|
||||
if let Err(e) = self
|
||||
.load_local_timeline(timeline_id, local_metadata, init_order, ctx, true)
|
||||
.await
|
||||
{
|
||||
match e {
|
||||
LoadLocalTimelineError::Load(source) => {
|
||||
// We tried to load deleted timeline, this is a bug.
|
||||
return Err(anyhow::anyhow!(source).context(
|
||||
"This is a bug. We tried to load deleted timeline which is wrong and loading failed. Timeline: {timeline_id}"
|
||||
));
|
||||
}
|
||||
LoadLocalTimelineError::ResumeDeletion(source) => {
|
||||
// Make sure resumed deletion wont fail loading for entire tenant.
|
||||
error!("Failed to resume timeline deletion: {source:#}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3814,9 +3835,9 @@ mod tests {
|
||||
.await
|
||||
.err()
|
||||
.expect("should fail");
|
||||
// get all the stack with all .context, not tonly the last one
|
||||
// get all the stack with all .context, not only the last one
|
||||
let message = format!("{err:#}");
|
||||
let expected = "Failed to parse metadata bytes from path";
|
||||
let expected = "failed to load metadata";
|
||||
assert!(
|
||||
message.contains(expected),
|
||||
"message '{message}' expected to contain {expected}"
|
||||
@@ -3833,7 +3854,8 @@ mod tests {
|
||||
}
|
||||
assert!(
|
||||
found_error_message,
|
||||
"didn't find the corrupted metadata error"
|
||||
"didn't find the corrupted metadata error in {}",
|
||||
message
|
||||
);
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -9,10 +9,11 @@
|
||||
//! [`remote_timeline_client`]: super::remote_timeline_client
|
||||
|
||||
use std::fs::{File, OpenOptions};
|
||||
use std::io::Write;
|
||||
use std::io::{self, Write};
|
||||
|
||||
use anyhow::{bail, ensure, Context};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use thiserror::Error;
|
||||
use tracing::info_span;
|
||||
use utils::bin_ser::SerializeError;
|
||||
use utils::{
|
||||
@@ -267,24 +268,24 @@ pub fn save_metadata(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum LoadMetadataError {
|
||||
#[error(transparent)]
|
||||
Read(#[from] io::Error),
|
||||
|
||||
#[error(transparent)]
|
||||
Decode(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
pub fn load_metadata(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
) -> anyhow::Result<TimelineMetadata> {
|
||||
) -> Result<TimelineMetadata, LoadMetadataError> {
|
||||
let metadata_path = conf.metadata_path(tenant_id, timeline_id);
|
||||
let metadata_bytes = std::fs::read(&metadata_path).with_context(|| {
|
||||
format!(
|
||||
"Failed to read metadata bytes from path {}",
|
||||
metadata_path.display()
|
||||
)
|
||||
})?;
|
||||
TimelineMetadata::from_bytes(&metadata_bytes).with_context(|| {
|
||||
format!(
|
||||
"Failed to parse metadata bytes from path {}",
|
||||
metadata_path.display()
|
||||
)
|
||||
})
|
||||
let metadata_bytes = std::fs::read(metadata_path)?;
|
||||
|
||||
Ok(TimelineMetadata::from_bytes(&metadata_bytes)?)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
Reference in New Issue
Block a user