mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 05:22:56 +00:00
pageserver: fix race cleaning up timeline files when shut down during bootstrap (#10532)
## Problem Timeline bootstrap starts a flush loop, but doesn't reliably shut down the timeline (incl. waiting for flush loop to exit) before destroying UninitializedTimeline, and that destructor tries to clean up local storage. If local storage is still being written to, then this is unsound. Currently the symptom is that we see a "Directory not empty" error log, e.g. https://neon-github-public-dev.s3.amazonaws.com/reports/main/12966756686/index.html#testresult/5523f7d15f46f7f7/retries ## Summary of changes - Move fallible IO part of bootstrap into a function (notably, this is fallible in the case of the tenant being shut down while creation is happening) - When that function returns an error, call shutdown() on the timeline
This commit is contained in:
@@ -3169,12 +3169,16 @@ async fn put_tenant_timeline_import_basebackup(
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
|
||||
let span = info_span!("import_basebackup", tenant_id=%tenant_id, timeline_id=%timeline_id, base_lsn=%base_lsn, end_lsn=%end_lsn, pg_version=%pg_version);
|
||||
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
|
||||
|
||||
let span = info_span!("import_basebackup",
|
||||
tenant_id=%tenant_id, timeline_id=%timeline_id, shard_id=%tenant_shard_id.shard_slug(),
|
||||
base_lsn=%base_lsn, end_lsn=%end_lsn, pg_version=%pg_version);
|
||||
async move {
|
||||
let state = get_state(&request);
|
||||
let tenant = state
|
||||
.tenant_manager
|
||||
.get_attached_tenant_shard(TenantShardId::unsharded(tenant_id))?;
|
||||
.get_attached_tenant_shard(tenant_shard_id)?;
|
||||
|
||||
let broker_client = state.broker_client.clone();
|
||||
|
||||
|
||||
@@ -2426,7 +2426,7 @@ impl Tenant {
|
||||
// Make sure the freeze_and_flush reaches remote storage.
|
||||
tline.remote_client.wait_completion().await.unwrap();
|
||||
|
||||
let tl = uninit_tl.finish_creation()?;
|
||||
let tl = uninit_tl.finish_creation().await?;
|
||||
// The non-test code would call tl.activate() here.
|
||||
tl.set_state(TimelineState::Active);
|
||||
Ok(tl)
|
||||
@@ -4702,7 +4702,7 @@ impl Tenant {
|
||||
)
|
||||
.await?;
|
||||
|
||||
let new_timeline = uninitialized_timeline.finish_creation()?;
|
||||
let new_timeline = uninitialized_timeline.finish_creation().await?;
|
||||
|
||||
// Root timeline gets its layers during creation and uploads them along with the metadata.
|
||||
// A branch timeline though, when created, can get no writes for some time, hence won't get any layers created.
|
||||
@@ -4892,10 +4892,11 @@ impl Tenant {
|
||||
}
|
||||
|
||||
// this new directory is very temporary, set to remove it immediately after bootstrap, we don't need it
|
||||
let pgdata_path_deferred = pgdata_path.clone();
|
||||
scopeguard::defer! {
|
||||
if let Err(e) = fs::remove_dir_all(&pgdata_path) {
|
||||
if let Err(e) = fs::remove_dir_all(&pgdata_path_deferred) {
|
||||
// this is unlikely, but we will remove the directory on pageserver restart or another bootstrap call
|
||||
error!("Failed to remove temporary initdb directory '{pgdata_path}': {e}");
|
||||
error!("Failed to remove temporary initdb directory '{pgdata_path_deferred}': {e}");
|
||||
}
|
||||
}
|
||||
if let Some(existing_initdb_timeline_id) = load_existing_initdb {
|
||||
@@ -4962,7 +4963,7 @@ impl Tenant {
|
||||
pgdata_lsn,
|
||||
pg_version,
|
||||
);
|
||||
let raw_timeline = self
|
||||
let mut raw_timeline = self
|
||||
.prepare_new_timeline(
|
||||
timeline_id,
|
||||
&new_metadata,
|
||||
@@ -4973,42 +4974,33 @@ impl Tenant {
|
||||
.await?;
|
||||
|
||||
let tenant_shard_id = raw_timeline.owning_tenant.tenant_shard_id;
|
||||
let unfinished_timeline = raw_timeline.raw_timeline()?;
|
||||
|
||||
// Flush the new layer files to disk, before we make the timeline as available to
|
||||
// the outside world.
|
||||
//
|
||||
// Flush loop needs to be spawned in order to be able to flush.
|
||||
unfinished_timeline.maybe_spawn_flush_loop();
|
||||
|
||||
import_datadir::import_timeline_from_postgres_datadir(
|
||||
unfinished_timeline,
|
||||
&pgdata_path,
|
||||
pgdata_lsn,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Failed to import pgdatadir for timeline {tenant_shard_id}/{timeline_id}")
|
||||
})?;
|
||||
|
||||
fail::fail_point!("before-checkpoint-new-timeline", |_| {
|
||||
Err(CreateTimelineError::Other(anyhow::anyhow!(
|
||||
"failpoint before-checkpoint-new-timeline"
|
||||
)))
|
||||
});
|
||||
|
||||
unfinished_timeline
|
||||
.freeze_and_flush()
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to flush after pgdatadir import for timeline {tenant_shard_id}/{timeline_id}"
|
||||
raw_timeline
|
||||
.write(|unfinished_timeline| async move {
|
||||
import_datadir::import_timeline_from_postgres_datadir(
|
||||
&unfinished_timeline,
|
||||
&pgdata_path,
|
||||
pgdata_lsn,
|
||||
ctx,
|
||||
)
|
||||
})?;
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to import pgdatadir for timeline {tenant_shard_id}/{timeline_id}"
|
||||
)
|
||||
})?;
|
||||
|
||||
fail::fail_point!("before-checkpoint-new-timeline", |_| {
|
||||
Err(CreateTimelineError::Other(anyhow::anyhow!(
|
||||
"failpoint before-checkpoint-new-timeline"
|
||||
)))
|
||||
});
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await?;
|
||||
|
||||
// All done!
|
||||
let timeline = raw_timeline.finish_creation()?;
|
||||
let timeline = raw_timeline.finish_creation().await?;
|
||||
|
||||
// Callers are responsible to wait for uploads to complete and for activating the timeline.
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::{collections::hash_map::Entry, fs, sync::Arc};
|
||||
use std::{collections::hash_map::Entry, fs, future::Future, sync::Arc};
|
||||
|
||||
use anyhow::Context;
|
||||
use camino::Utf8PathBuf;
|
||||
@@ -8,7 +8,8 @@ use utils::{fs_ext, id::TimelineId, lsn::Lsn, sync::gate::GateGuard};
|
||||
use crate::{
|
||||
context::RequestContext,
|
||||
import_datadir,
|
||||
tenant::{CreateTimelineIdempotency, Tenant, TimelineOrOffloaded},
|
||||
span::debug_assert_current_span_has_tenant_and_timeline_id,
|
||||
tenant::{CreateTimelineError, CreateTimelineIdempotency, Tenant, TimelineOrOffloaded},
|
||||
};
|
||||
|
||||
use super::Timeline;
|
||||
@@ -24,6 +25,9 @@ pub struct UninitializedTimeline<'t> {
|
||||
pub(crate) owning_tenant: &'t Tenant,
|
||||
timeline_id: TimelineId,
|
||||
raw_timeline: Option<(Arc<Timeline>, TimelineCreateGuard)>,
|
||||
/// Whether we spawned the inner Timeline's tasks such that we must later shut it down
|
||||
/// if aborting the timeline creation
|
||||
needs_shutdown: bool,
|
||||
}
|
||||
|
||||
impl<'t> UninitializedTimeline<'t> {
|
||||
@@ -36,6 +40,50 @@ impl<'t> UninitializedTimeline<'t> {
|
||||
owning_tenant,
|
||||
timeline_id,
|
||||
raw_timeline,
|
||||
needs_shutdown: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// When writing data to this timeline during creation, use this wrapper: it will take care of
|
||||
/// setup of Timeline tasks required for I/O (flush loop) and making sure they are torn down
|
||||
/// later.
|
||||
pub(crate) async fn write<F, Fut>(&mut self, f: F) -> anyhow::Result<()>
|
||||
where
|
||||
F: FnOnce(Arc<Timeline>) -> Fut,
|
||||
Fut: Future<Output = Result<(), CreateTimelineError>>,
|
||||
{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
// Remember that we did I/O (spawned the flush loop), so that we can check we shut it down on drop
|
||||
self.needs_shutdown = true;
|
||||
|
||||
let timeline = self.raw_timeline()?;
|
||||
|
||||
// Spawn flush loop so that the Timeline is ready to accept writes
|
||||
timeline.maybe_spawn_flush_loop();
|
||||
|
||||
// Invoke the provided function, which will write some data into the new timeline
|
||||
if let Err(e) = f(timeline.clone()).await {
|
||||
self.abort().await;
|
||||
return Err(e.into());
|
||||
}
|
||||
|
||||
// Flush the underlying timeline's ephemeral layers to disk
|
||||
if let Err(e) = timeline
|
||||
.freeze_and_flush()
|
||||
.await
|
||||
.context("Failed to flush after timeline creation writes")
|
||||
{
|
||||
self.abort().await;
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn abort(&self) {
|
||||
if let Some((raw_timeline, _)) = self.raw_timeline.as_ref() {
|
||||
raw_timeline.shutdown(super::ShutdownMode::Hard).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,11 +92,13 @@ impl<'t> UninitializedTimeline<'t> {
|
||||
/// This function launches the flush loop if not already done.
|
||||
///
|
||||
/// The caller is responsible for activating the timeline (function `.activate()`).
|
||||
pub(crate) fn finish_creation(mut self) -> anyhow::Result<Arc<Timeline>> {
|
||||
pub(crate) async fn finish_creation(mut self) -> anyhow::Result<Arc<Timeline>> {
|
||||
let timeline_id = self.timeline_id;
|
||||
let tenant_shard_id = self.owning_tenant.tenant_shard_id;
|
||||
|
||||
if self.raw_timeline.is_none() {
|
||||
self.abort().await;
|
||||
|
||||
return Err(anyhow::anyhow!(
|
||||
"No timeline for initialization found for {tenant_shard_id}/{timeline_id}"
|
||||
));
|
||||
@@ -62,16 +112,25 @@ impl<'t> UninitializedTimeline<'t> {
|
||||
.0
|
||||
.get_disk_consistent_lsn();
|
||||
|
||||
anyhow::ensure!(
|
||||
new_disk_consistent_lsn.is_valid(),
|
||||
"new timeline {tenant_shard_id}/{timeline_id} has invalid disk_consistent_lsn"
|
||||
);
|
||||
if !new_disk_consistent_lsn.is_valid() {
|
||||
self.abort().await;
|
||||
|
||||
return Err(anyhow::anyhow!(
|
||||
"new timeline {tenant_shard_id}/{timeline_id} has invalid disk_consistent_lsn"
|
||||
));
|
||||
}
|
||||
|
||||
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
|
||||
match timelines.entry(timeline_id) {
|
||||
Entry::Occupied(_) => anyhow::bail!(
|
||||
Entry::Occupied(_) => {
|
||||
// Unexpected, bug in the caller. Tenant is responsible for preventing concurrent creation of the same timeline.
|
||||
//
|
||||
// We do not call Self::abort here. Because we don't cleanly shut down our Timeline, [`Self::drop`] should
|
||||
// skip trying to delete the timeline directory too.
|
||||
anyhow::bail!(
|
||||
"Found freshly initialized timeline {tenant_shard_id}/{timeline_id} in the tenant map"
|
||||
),
|
||||
)
|
||||
}
|
||||
Entry::Vacant(v) => {
|
||||
// after taking here should be no fallible operations, because the drop guard will not
|
||||
// cleanup after and would block for example the tenant deletion
|
||||
@@ -93,36 +152,31 @@ impl<'t> UninitializedTimeline<'t> {
|
||||
|
||||
/// Prepares timeline data by loading it from the basebackup archive.
|
||||
pub(crate) async fn import_basebackup_from_tar(
|
||||
self,
|
||||
mut self,
|
||||
tenant: Arc<Tenant>,
|
||||
copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
|
||||
base_lsn: Lsn,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let raw_timeline = self.raw_timeline()?;
|
||||
self.write(|raw_timeline| async move {
|
||||
import_datadir::import_basebackup_from_tar(&raw_timeline, copyin_read, base_lsn, ctx)
|
||||
.await
|
||||
.context("Failed to import basebackup")
|
||||
.map_err(CreateTimelineError::Other)?;
|
||||
|
||||
import_datadir::import_basebackup_from_tar(raw_timeline, copyin_read, base_lsn, ctx)
|
||||
.await
|
||||
.context("Failed to import basebackup")?;
|
||||
fail::fail_point!("before-checkpoint-new-timeline", |_| {
|
||||
Err(CreateTimelineError::Other(anyhow::anyhow!(
|
||||
"failpoint before-checkpoint-new-timeline"
|
||||
)))
|
||||
});
|
||||
|
||||
// Flush the new layer files to disk, before we make the timeline as available to
|
||||
// the outside world.
|
||||
//
|
||||
// Flush loop needs to be spawned in order to be able to flush.
|
||||
raw_timeline.maybe_spawn_flush_loop();
|
||||
|
||||
fail::fail_point!("before-checkpoint-new-timeline", |_| {
|
||||
anyhow::bail!("failpoint before-checkpoint-new-timeline");
|
||||
});
|
||||
|
||||
raw_timeline
|
||||
.freeze_and_flush()
|
||||
.await
|
||||
.context("Failed to flush after basebackup import")?;
|
||||
Ok(())
|
||||
})
|
||||
.await?;
|
||||
|
||||
// All the data has been imported. Insert the Timeline into the tenant's timelines map
|
||||
let tl = self.finish_creation()?;
|
||||
let tl = self.finish_creation().await?;
|
||||
tl.activate(tenant, broker_client, None, ctx);
|
||||
Ok(tl)
|
||||
}
|
||||
@@ -143,12 +197,19 @@ impl<'t> UninitializedTimeline<'t> {
|
||||
|
||||
impl Drop for UninitializedTimeline<'_> {
|
||||
fn drop(&mut self) {
|
||||
if let Some((_, create_guard)) = self.raw_timeline.take() {
|
||||
if let Some((timeline, create_guard)) = self.raw_timeline.take() {
|
||||
let _entered = info_span!("drop_uninitialized_timeline", tenant_id = %self.owning_tenant.tenant_shard_id.tenant_id, shard_id = %self.owning_tenant.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id).entered();
|
||||
// This is unusual, but can happen harmlessly if the pageserver is stopped while
|
||||
// creating a timeline.
|
||||
info!("Timeline got dropped without initializing, cleaning its files");
|
||||
cleanup_timeline_directory(create_guard);
|
||||
if self.needs_shutdown && !timeline.gate.close_complete() {
|
||||
// This should not happen: caller should call [`Self::abort`] on failures
|
||||
tracing::warn!(
|
||||
"Timeline not shut down after initialization failure, cannot clean up files"
|
||||
);
|
||||
} else {
|
||||
// This is unusual, but can happen harmlessly if the pageserver is stopped while
|
||||
// creating a timeline.
|
||||
info!("Timeline got dropped without initializing, cleaning its files");
|
||||
cleanup_timeline_directory(create_guard);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user