From 51639cd6afc12ddb14a475c1b4be68e996a1389b Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Thu, 29 May 2025 12:13:52 +0100 Subject: [PATCH] pageserver: allow for deletion of importing timelines (#12033) ## Problem Importing timelines can't currently be deleted. This is problematic because: 1. Cplane cannot delete failed imports and we leave the timeline behind. 2. The flow does not support user driven cancellation of the import ## Summary of changes On the pageserver: I've taken the path of least resistance, extended `TimelineOrOffloaded` with a new variant and added handling in the right places. I'm open to thoughts here, but I think it turned out better than I was envisioning. On the storage controller: Again, fairly simple business: when a DELETE timeline request is received, we remove the import from the DB and stop any finalization tasks/futures. In order to stop finalizations, we track them in-memory. For each finalizing import, we associate a gate and a cancellation token. Note that we delete the entry from the database before cancelling any finalizations. This is such that a concurrent request can't progress the import into finalize state and race with the deletion. This concern about deleting an import with on-going finalization is theoretical in the near future. We are only going to delete importing timelines after the storage controller reports the failure to cplane. Alas, the design works for user driven cancellation too. Closes https://github.com/neondatabase/neon/issues/11897 --- pageserver/src/tenant.rs | 50 +++++++++- pageserver/src/tenant/timeline/delete.rs | 40 ++++++-- .../src/tenant/timeline/import_pgdata.rs | 17 +++- storage_controller/src/http.rs | 4 + storage_controller/src/service.rs | 93 ++++++++++++++++++- storage_controller/src/timeline_import.rs | 8 ++ test_runner/fixtures/neon_fixtures.py | 16 ++++ test_runner/fixtures/pageserver/http.py | 4 +- test_runner/regress/test_import_pgdata.py | 42 +++++++-- 9 files changed, 245 insertions(+), 29 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 58b766933d..d85d970583 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -300,7 +300,7 @@ pub struct TenantShard { /// as in progress. /// * Imported timelines are removed when the storage controller calls the post timeline /// import activation endpoint. - timelines_importing: std::sync::Mutex>, + timelines_importing: std::sync::Mutex>>, /// The last tenant manifest known to be in remote storage. None if the manifest has not yet /// been either downloaded or uploaded. Always Some after tenant attach. @@ -672,6 +672,7 @@ pub enum MaybeOffloaded { pub enum TimelineOrOffloaded { Timeline(Arc), Offloaded(Arc), + Importing(Arc), } impl TimelineOrOffloaded { @@ -683,6 +684,9 @@ impl TimelineOrOffloaded { TimelineOrOffloaded::Offloaded(offloaded) => { TimelineOrOffloadedArcRef::Offloaded(offloaded) } + TimelineOrOffloaded::Importing(importing) => { + TimelineOrOffloadedArcRef::Importing(importing) + } } } pub fn tenant_shard_id(&self) -> TenantShardId { @@ -695,12 +699,16 @@ impl TimelineOrOffloaded { match self { TimelineOrOffloaded::Timeline(timeline) => &timeline.delete_progress, TimelineOrOffloaded::Offloaded(offloaded) => &offloaded.delete_progress, + TimelineOrOffloaded::Importing(importing) => &importing.delete_progress, } } fn maybe_remote_client(&self) -> Option> { match self { TimelineOrOffloaded::Timeline(timeline) => Some(timeline.remote_client.clone()), TimelineOrOffloaded::Offloaded(_offloaded) => None, + TimelineOrOffloaded::Importing(importing) => { + Some(importing.timeline.remote_client.clone()) + } } } } @@ -708,6 +716,7 @@ impl TimelineOrOffloaded { pub enum TimelineOrOffloadedArcRef<'a> { Timeline(&'a Arc), Offloaded(&'a Arc), + Importing(&'a Arc), } impl TimelineOrOffloadedArcRef<'_> { @@ -715,12 +724,14 @@ impl TimelineOrOffloadedArcRef<'_> { match self { TimelineOrOffloadedArcRef::Timeline(timeline) => timeline.tenant_shard_id, TimelineOrOffloadedArcRef::Offloaded(offloaded) => offloaded.tenant_shard_id, + TimelineOrOffloadedArcRef::Importing(importing) => importing.timeline.tenant_shard_id, } } pub fn timeline_id(&self) -> TimelineId { match self { TimelineOrOffloadedArcRef::Timeline(timeline) => timeline.timeline_id, TimelineOrOffloadedArcRef::Offloaded(offloaded) => offloaded.timeline_id, + TimelineOrOffloadedArcRef::Importing(importing) => importing.timeline.timeline_id, } } } @@ -737,6 +748,12 @@ impl<'a> From<&'a Arc> for TimelineOrOffloadedArcRef<'a> { } } +impl<'a> From<&'a Arc> for TimelineOrOffloadedArcRef<'a> { + fn from(timeline: &'a Arc) -> Self { + Self::Importing(timeline) + } +} + #[derive(Debug, thiserror::Error, PartialEq, Eq)] pub enum GetTimelineError { #[error("Timeline is shutting down")] @@ -1789,20 +1806,25 @@ impl TenantShard { }, ) => { let timeline_id = timeline.timeline_id; + let import_task_gate = Gate::default(); + let import_task_guard = import_task_gate.enter().unwrap(); let import_task_handle = tokio::task::spawn(self.clone().create_timeline_import_pgdata_task( timeline.clone(), import_pgdata, guard, + import_task_guard, ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Warn), )); let prev = self.timelines_importing.lock().unwrap().insert( timeline_id, - ImportingTimeline { + Arc::new(ImportingTimeline { timeline: timeline.clone(), import_task_handle, - }, + import_task_gate, + delete_progress: TimelineDeleteProgress::default(), + }), ); assert!(prev.is_none()); @@ -2853,19 +2875,25 @@ impl TenantShard { let (timeline, timeline_create_guard) = uninit_timeline.finish_creation_myself(); + let import_task_gate = Gate::default(); + let import_task_guard = import_task_gate.enter().unwrap(); + let import_task_handle = tokio::spawn(self.clone().create_timeline_import_pgdata_task( timeline.clone(), index_part, timeline_create_guard, + import_task_guard, timeline_ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Warn), )); let prev = self.timelines_importing.lock().unwrap().insert( timeline.timeline_id, - ImportingTimeline { + Arc::new(ImportingTimeline { timeline: timeline.clone(), import_task_handle, - }, + import_task_gate, + delete_progress: TimelineDeleteProgress::default(), + }), ); // Idempotency is enforced higher up the stack @@ -2924,6 +2952,7 @@ impl TenantShard { timeline: Arc, index_part: import_pgdata::index_part_format::Root, timeline_create_guard: TimelineCreateGuard, + _import_task_guard: GateGuard, ctx: RequestContext, ) { debug_assert_current_span_has_tenant_and_timeline_id(); @@ -3835,6 +3864,9 @@ impl TenantShard { .build_timeline_client(offloaded.timeline_id, self.remote_storage.clone()); Arc::new(remote_client) } + TimelineOrOffloadedArcRef::Importing(_) => { + unreachable!("Importing timelines are not included in the iterator") + } }; // Shut down the timeline's remote client: this means that the indices we write @@ -5044,6 +5076,14 @@ impl TenantShard { info!("timeline already exists but is offloaded"); Err(CreateTimelineError::Conflict) } + Err(TimelineExclusionError::AlreadyExists { + existing: TimelineOrOffloaded::Importing(_existing), + .. + }) => { + // If there's a timeline already importing, then we would hit + // the [`TimelineExclusionError::AlreadyCreating`] branch above. + unreachable!("Importing timelines hold the creation guard") + } Err(TimelineExclusionError::AlreadyExists { existing: TimelineOrOffloaded::Timeline(existing), arg, diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index 1d4dd05e34..51bdd59f4f 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -121,6 +121,7 @@ async fn remove_maybe_offloaded_timeline_from_tenant( // This observes the locking order between timelines and timelines_offloaded let mut timelines = tenant.timelines.lock().unwrap(); let mut timelines_offloaded = tenant.timelines_offloaded.lock().unwrap(); + let mut timelines_importing = tenant.timelines_importing.lock().unwrap(); let offloaded_children_exist = timelines_offloaded .iter() .any(|(_, entry)| entry.ancestor_timeline_id == Some(timeline.timeline_id())); @@ -150,8 +151,12 @@ async fn remove_maybe_offloaded_timeline_from_tenant( .expect("timeline that we were deleting was concurrently removed from 'timelines_offloaded' map"); offloaded_timeline.delete_from_ancestor_with_timelines(&timelines); } + TimelineOrOffloaded::Importing(importing) => { + timelines_importing.remove(&importing.timeline.timeline_id); + } } + drop(timelines_importing); drop(timelines_offloaded); drop(timelines); @@ -203,8 +208,17 @@ impl DeleteTimelineFlow { guard.mark_in_progress()?; // Now that the Timeline is in Stopping state, request all the related tasks to shut down. - if let TimelineOrOffloaded::Timeline(timeline) = &timeline { - timeline.shutdown(super::ShutdownMode::Hard).await; + // TODO(vlad): shut down imported timeline here + match &timeline { + TimelineOrOffloaded::Timeline(timeline) => { + timeline.shutdown(super::ShutdownMode::Hard).await; + } + TimelineOrOffloaded::Importing(importing) => { + importing.shutdown().await; + } + TimelineOrOffloaded::Offloaded(_offloaded) => { + // Nothing to shut down in this case + } } tenant.gc_block.before_delete(&timeline.timeline_id()); @@ -389,10 +403,18 @@ impl DeleteTimelineFlow { Err(anyhow::anyhow!("failpoint: timeline-delete-before-rm"))? }); - // Offloaded timelines have no local state - // TODO: once we persist offloaded information, delete the timeline from there, too - if let TimelineOrOffloaded::Timeline(timeline) = timeline { - delete_local_timeline_directory(conf, tenant.tenant_shard_id, timeline).await; + match timeline { + TimelineOrOffloaded::Timeline(timeline) => { + delete_local_timeline_directory(conf, tenant.tenant_shard_id, timeline).await; + } + TimelineOrOffloaded::Importing(importing) => { + delete_local_timeline_directory(conf, tenant.tenant_shard_id, &importing.timeline) + .await; + } + TimelineOrOffloaded::Offloaded(_offloaded) => { + // Offloaded timelines have no local state + // TODO: once we persist offloaded information, delete the timeline from there, too + } } fail::fail_point!("timeline-delete-after-rm", |_| { @@ -451,12 +473,16 @@ pub(super) fn make_timeline_delete_guard( // For more context see this discussion: `https://github.com/neondatabase/neon/pull/4552#discussion_r1253437346` let timelines = tenant.timelines.lock().unwrap(); let timelines_offloaded = tenant.timelines_offloaded.lock().unwrap(); + let timelines_importing = tenant.timelines_importing.lock().unwrap(); let timeline = match timelines.get(&timeline_id) { Some(t) => TimelineOrOffloaded::Timeline(Arc::clone(t)), None => match timelines_offloaded.get(&timeline_id) { Some(t) => TimelineOrOffloaded::Offloaded(Arc::clone(t)), - None => return Err(DeleteTimelineError::NotFound), + None => match timelines_importing.get(&timeline_id) { + Some(t) => TimelineOrOffloaded::Importing(Arc::clone(t)), + None => return Err(DeleteTimelineError::NotFound), + }, }, }; diff --git a/pageserver/src/tenant/timeline/import_pgdata.rs b/pageserver/src/tenant/timeline/import_pgdata.rs index db62e9000c..bdb34ec3a3 100644 --- a/pageserver/src/tenant/timeline/import_pgdata.rs +++ b/pageserver/src/tenant/timeline/import_pgdata.rs @@ -8,8 +8,9 @@ use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tracing::info; use utils::lsn::Lsn; +use utils::sync::gate::Gate; -use super::Timeline; +use super::{Timeline, TimelineDeleteProgress}; use crate::context::RequestContext; use crate::controller_upcall_client::{StorageControllerUpcallApi, StorageControllerUpcallClient}; use crate::tenant::metadata::TimelineMetadata; @@ -19,15 +20,23 @@ mod importbucket_client; mod importbucket_format; pub(crate) mod index_part_format; -pub(crate) struct ImportingTimeline { +pub struct ImportingTimeline { pub import_task_handle: JoinHandle<()>, + pub import_task_gate: Gate, pub timeline: Arc, + pub delete_progress: TimelineDeleteProgress, +} + +impl std::fmt::Debug for ImportingTimeline { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "ImportingTimeline<{}>", self.timeline.timeline_id) + } } impl ImportingTimeline { - pub(crate) async fn shutdown(self) { + pub async fn shutdown(&self) { self.import_task_handle.abort(); - let _ = self.import_task_handle.await; + self.import_task_gate.close().await; self.timeline.remote_client.shutdown().await; } diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index 02c02c0e7f..2b1c0db12f 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -482,6 +482,10 @@ async fn handle_tenant_timeline_delete( ForwardOutcome::NotForwarded(_req) => {} }; + service + .maybe_delete_timeline_import(tenant_id, timeline_id) + .await?; + // For timeline deletions, which both implement an "initially return 202, then 404 once // we're done" semantic, we wrap with a retry loop to expose a simpler API upstream. async fn deletion_wrapper(service: Arc, f: F) -> Result, ApiError> diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 823f4dadfa..790797bae2 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -99,8 +99,8 @@ use crate::tenant_shard::{ ScheduleOptimization, ScheduleOptimizationAction, TenantShard, }; use crate::timeline_import::{ - ImportResult, ShardImportStatuses, TimelineImport, TimelineImportFinalizeError, - TimelineImportState, UpcallClient, + FinalizingImport, ImportResult, ShardImportStatuses, TimelineImport, + TimelineImportFinalizeError, TimelineImportState, UpcallClient, }; const WAITER_FILL_DRAIN_POLL_TIMEOUT: Duration = Duration::from_millis(500); @@ -232,6 +232,9 @@ struct ServiceState { /// Queue of tenants who are waiting for concurrency limits to permit them to reconcile delayed_reconcile_rx: tokio::sync::mpsc::Receiver, + + /// Tracks ongoing timeline import finalization tasks + imports_finalizing: BTreeMap<(TenantId, TimelineId), FinalizingImport>, } /// Transform an error from a pageserver into an error to return to callers of a storage @@ -308,6 +311,7 @@ impl ServiceState { scheduler, ongoing_operation: None, delayed_reconcile_rx, + imports_finalizing: Default::default(), } } @@ -4097,13 +4101,58 @@ impl Service { /// /// If this method gets pre-empted by shut down, it will be called again at start-up (on-going /// imports are stored in the database). + /// + /// # Cancel-Safety + /// Not cancel safe. + /// If the caller stops polling, the import will not be removed from + /// [`ServiceState::imports_finalizing`]. #[instrument(skip_all, fields( tenant_id=%import.tenant_id, timeline_id=%import.timeline_id, ))] + async fn finalize_timeline_import( self: &Arc, import: TimelineImport, + ) -> Result<(), TimelineImportFinalizeError> { + let tenant_timeline = (import.tenant_id, import.timeline_id); + + let (_finalize_import_guard, cancel) = { + let mut locked = self.inner.write().unwrap(); + let gate = Gate::default(); + let cancel = CancellationToken::default(); + + let guard = gate.enter().unwrap(); + + locked.imports_finalizing.insert( + tenant_timeline, + FinalizingImport { + gate, + cancel: cancel.clone(), + }, + ); + + (guard, cancel) + }; + + let res = tokio::select! { + res = self.finalize_timeline_import_impl(import) => { + res + }, + _ = cancel.cancelled() => { + Err(TimelineImportFinalizeError::Cancelled) + } + }; + + let mut locked = self.inner.write().unwrap(); + locked.imports_finalizing.remove(&tenant_timeline); + + res + } + + async fn finalize_timeline_import_impl( + self: &Arc, + import: TimelineImport, ) -> Result<(), TimelineImportFinalizeError> { tracing::info!("Finalizing timeline import"); @@ -4303,6 +4352,46 @@ impl Service { .await; } + /// Delete a timeline import if it exists + /// + /// Firstly, delete the entry from the database. Any updates + /// from pageservers after the update will fail with a 404, so the + /// import cannot progress into finalizing state if it's not there already. + /// Secondly, cancel the finalization if one is in progress. + pub(crate) async fn maybe_delete_timeline_import( + self: &Arc, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> Result<(), DatabaseError> { + let tenant_has_ongoing_import = { + let locked = self.inner.read().unwrap(); + locked + .tenants + .range(TenantShardId::tenant_range(tenant_id)) + .any(|(_tid, shard)| shard.importing == TimelineImportState::Importing) + }; + + if !tenant_has_ongoing_import { + return Ok(()); + } + + self.persistence + .delete_timeline_import(tenant_id, timeline_id) + .await?; + + let maybe_finalizing = { + let mut locked = self.inner.write().unwrap(); + locked.imports_finalizing.remove(&(tenant_id, timeline_id)) + }; + + if let Some(finalizing) = maybe_finalizing { + finalizing.cancel.cancel(); + finalizing.gate.close().await; + } + + Ok(()) + } + pub(crate) async fn tenant_timeline_archival_config( &self, tenant_id: TenantId, diff --git a/storage_controller/src/timeline_import.rs b/storage_controller/src/timeline_import.rs index 909e8e2899..eb50819d02 100644 --- a/storage_controller/src/timeline_import.rs +++ b/storage_controller/src/timeline_import.rs @@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize}; use pageserver_api::models::{ShardImportProgress, ShardImportStatus}; use tokio_util::sync::CancellationToken; +use utils::sync::gate::Gate; use utils::{ id::{TenantId, TimelineId}, shard::ShardIndex, @@ -55,6 +56,8 @@ pub(crate) enum TimelineImportUpdateFollowUp { pub(crate) enum TimelineImportFinalizeError { #[error("Shut down interrupted import finalize")] ShuttingDown, + #[error("Import finalization was cancelled")] + Cancelled, #[error("Mismatched shard detected during import finalize: {0}")] MismatchedShards(ShardIndex), } @@ -164,6 +167,11 @@ impl TimelineImport { } } +pub(crate) struct FinalizingImport { + pub(crate) gate: Gate, + pub(crate) cancel: CancellationToken, +} + pub(crate) type ImportResult = Result<(), String>; pub(crate) struct UpcallClient { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 7f4150b580..eedeb4f696 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2337,6 +2337,22 @@ class NeonStorageController(MetricsGetter, LogUtils): headers=self.headers(TokenScope.ADMIN), ) + def import_status( + self, tenant_shard_id: TenantShardId, timeline_id: TimelineId, generation: int + ): + payload = { + "tenant_shard_id": str(tenant_shard_id), + "timeline_id": str(timeline_id), + "generation": generation, + } + + self.request( + "GET", + f"{self.api}/upcall/v1/timeline_import_status", + headers=self.headers(TokenScope.GENERATIONS_API), + json=payload, + ) + def reconcile_all(self): r = self.request( "POST", diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index c2d176bf5a..c29192c25c 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -675,7 +675,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter): def timeline_delete( self, tenant_id: TenantId | TenantShardId, timeline_id: TimelineId, **kwargs - ): + ) -> int: """ Note that deletion is not instant, it is scheduled and performed mostly in the background. So if you need to wait for it to complete use `timeline_delete_wait_completed`. @@ -688,6 +688,8 @@ class PageserverHttpClient(requests.Session, MetricsGetter): res_json = res.json() assert res_json is None + return res.status_code + def timeline_gc( self, tenant_id: TenantId | TenantShardId, diff --git a/test_runner/regress/test_import_pgdata.py b/test_runner/regress/test_import_pgdata.py index 69cbdec5b0..262ec9b06c 100644 --- a/test_runner/regress/test_import_pgdata.py +++ b/test_runner/regress/test_import_pgdata.py @@ -19,6 +19,7 @@ from fixtures.neon_fixtures import ( PageserverImportConfig, PgBin, PgProtocol, + StorageControllerApiException, StorageControllerMigrationConfig, VanillaPostgres, ) @@ -423,8 +424,12 @@ def test_import_completion_on_restart( @run_only_on_default_postgres(reason="PG version is irrelevant here") -def test_import_respects_tenant_shutdown( - neon_env_builder: NeonEnvBuilder, vanilla_pg: VanillaPostgres, make_httpserver: HTTPServer +@pytest.mark.parametrize("action", ["restart", "delete"]) +def test_import_respects_timeline_lifecycle( + neon_env_builder: NeonEnvBuilder, + vanilla_pg: VanillaPostgres, + make_httpserver: HTTPServer, + action: str, ): """ Validate that importing timelines respect the usual timeline life cycle: @@ -492,16 +497,33 @@ def test_import_respects_tenant_shutdown( wait_until(hit_failpoint) assert not import_completion_signaled.is_set() - # Restart the pageserver while an import job is in progress. - # This clears the failpoint and we expect that the import starts up afresh - # after the restart and eventually completes. - env.pageserver.stop() - env.pageserver.start() + if action == "restart": + # Restart the pageserver while an import job is in progress. + # This clears the failpoint and we expect that the import starts up afresh + # after the restart and eventually completes. + env.pageserver.stop() + env.pageserver.start() - def cplane_notified(): - assert import_completion_signaled.is_set() + def cplane_notified(): + assert import_completion_signaled.is_set() - wait_until(cplane_notified) + wait_until(cplane_notified) + elif action == "delete": + status = env.storage_controller.pageserver_api().timeline_delete(tenant_id, timeline_id) + assert status == 200 + + timeline_path = env.pageserver.timeline_dir(tenant_id, timeline_id) + assert not timeline_path.exists(), "Timeline dir exists after deletion" + + shard_zero = TenantShardId(tenant_id, 0, 0) + location = env.storage_controller.inspect(shard_zero) + assert location is not None + generation = location[0] + + with pytest.raises(StorageControllerApiException, match="not found"): + env.storage_controller.import_status(shard_zero, timeline_id, generation) + else: + raise RuntimeError(f"{action} param not recognized") @skip_in_debug_build("Validation query takes too long in debug builds")