pageserver: allow for deletion of importing timelines (#12033)

## Problem

Importing timelines can't currently be deleted. This is problematic
because:
1. Cplane cannot delete failed imports and we leave the timeline behind.
2. The flow does not support user driven cancellation of the import

## Summary of changes

On the pageserver: I've taken the path of least resistance, extended
`TimelineOrOffloaded`
with a new variant and added handling in the right places. I'm open to
thoughts here,
but I think it turned out better than I was envisioning.

On the storage controller: Again, fairly simple business: when a DELETE
timeline request is
received, we remove the import from the DB and stop any finalization
tasks/futures. In order
to stop finalizations, we track them in-memory. For each finalizing
import, we associate a gate
and a cancellation token.

Note that we delete the entry from the database before cancelling any
finalizations. This is such
that a concurrent request can't progress the import into finalize state
and race with the deletion.
This concern about deleting an import with on-going finalization is
theoretical in the near future.
We are only going to delete importing timelines after the storage
controller reports the failure to
cplane. Alas, the design works for user driven cancellation too.

Closes https://github.com/neondatabase/neon/issues/11897
This commit is contained in:
Vlad Lazar
2025-05-29 12:13:52 +01:00
committed by GitHub
parent 529d661532
commit 51639cd6af
9 changed files with 245 additions and 29 deletions

View File

@@ -482,6 +482,10 @@ async fn handle_tenant_timeline_delete(
ForwardOutcome::NotForwarded(_req) => {}
};
service
.maybe_delete_timeline_import(tenant_id, timeline_id)
.await?;
// For timeline deletions, which both implement an "initially return 202, then 404 once
// we're done" semantic, we wrap with a retry loop to expose a simpler API upstream.
async fn deletion_wrapper<R, F>(service: Arc<Service>, f: F) -> Result<Response<Body>, ApiError>

View File

@@ -99,8 +99,8 @@ use crate::tenant_shard::{
ScheduleOptimization, ScheduleOptimizationAction, TenantShard,
};
use crate::timeline_import::{
ImportResult, ShardImportStatuses, TimelineImport, TimelineImportFinalizeError,
TimelineImportState, UpcallClient,
FinalizingImport, ImportResult, ShardImportStatuses, TimelineImport,
TimelineImportFinalizeError, TimelineImportState, UpcallClient,
};
const WAITER_FILL_DRAIN_POLL_TIMEOUT: Duration = Duration::from_millis(500);
@@ -232,6 +232,9 @@ struct ServiceState {
/// Queue of tenants who are waiting for concurrency limits to permit them to reconcile
delayed_reconcile_rx: tokio::sync::mpsc::Receiver<TenantShardId>,
/// Tracks ongoing timeline import finalization tasks
imports_finalizing: BTreeMap<(TenantId, TimelineId), FinalizingImport>,
}
/// Transform an error from a pageserver into an error to return to callers of a storage
@@ -308,6 +311,7 @@ impl ServiceState {
scheduler,
ongoing_operation: None,
delayed_reconcile_rx,
imports_finalizing: Default::default(),
}
}
@@ -4097,13 +4101,58 @@ impl Service {
///
/// If this method gets pre-empted by shut down, it will be called again at start-up (on-going
/// imports are stored in the database).
///
/// # Cancel-Safety
/// Not cancel safe.
/// If the caller stops polling, the import will not be removed from
/// [`ServiceState::imports_finalizing`].
#[instrument(skip_all, fields(
tenant_id=%import.tenant_id,
timeline_id=%import.timeline_id,
))]
async fn finalize_timeline_import(
self: &Arc<Self>,
import: TimelineImport,
) -> Result<(), TimelineImportFinalizeError> {
let tenant_timeline = (import.tenant_id, import.timeline_id);
let (_finalize_import_guard, cancel) = {
let mut locked = self.inner.write().unwrap();
let gate = Gate::default();
let cancel = CancellationToken::default();
let guard = gate.enter().unwrap();
locked.imports_finalizing.insert(
tenant_timeline,
FinalizingImport {
gate,
cancel: cancel.clone(),
},
);
(guard, cancel)
};
let res = tokio::select! {
res = self.finalize_timeline_import_impl(import) => {
res
},
_ = cancel.cancelled() => {
Err(TimelineImportFinalizeError::Cancelled)
}
};
let mut locked = self.inner.write().unwrap();
locked.imports_finalizing.remove(&tenant_timeline);
res
}
async fn finalize_timeline_import_impl(
self: &Arc<Self>,
import: TimelineImport,
) -> Result<(), TimelineImportFinalizeError> {
tracing::info!("Finalizing timeline import");
@@ -4303,6 +4352,46 @@ impl Service {
.await;
}
/// Delete a timeline import if it exists
///
/// Firstly, delete the entry from the database. Any updates
/// from pageservers after the update will fail with a 404, so the
/// import cannot progress into finalizing state if it's not there already.
/// Secondly, cancel the finalization if one is in progress.
pub(crate) async fn maybe_delete_timeline_import(
self: &Arc<Self>,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<(), DatabaseError> {
let tenant_has_ongoing_import = {
let locked = self.inner.read().unwrap();
locked
.tenants
.range(TenantShardId::tenant_range(tenant_id))
.any(|(_tid, shard)| shard.importing == TimelineImportState::Importing)
};
if !tenant_has_ongoing_import {
return Ok(());
}
self.persistence
.delete_timeline_import(tenant_id, timeline_id)
.await?;
let maybe_finalizing = {
let mut locked = self.inner.write().unwrap();
locked.imports_finalizing.remove(&(tenant_id, timeline_id))
};
if let Some(finalizing) = maybe_finalizing {
finalizing.cancel.cancel();
finalizing.gate.close().await;
}
Ok(())
}
pub(crate) async fn tenant_timeline_archival_config(
&self,
tenant_id: TenantId,

View File

@@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize};
use pageserver_api::models::{ShardImportProgress, ShardImportStatus};
use tokio_util::sync::CancellationToken;
use utils::sync::gate::Gate;
use utils::{
id::{TenantId, TimelineId},
shard::ShardIndex,
@@ -55,6 +56,8 @@ pub(crate) enum TimelineImportUpdateFollowUp {
pub(crate) enum TimelineImportFinalizeError {
#[error("Shut down interrupted import finalize")]
ShuttingDown,
#[error("Import finalization was cancelled")]
Cancelled,
#[error("Mismatched shard detected during import finalize: {0}")]
MismatchedShards(ShardIndex),
}
@@ -164,6 +167,11 @@ impl TimelineImport {
}
}
pub(crate) struct FinalizingImport {
pub(crate) gate: Gate,
pub(crate) cancel: CancellationToken,
}
pub(crate) type ImportResult = Result<(), String>;
pub(crate) struct UpcallClient {