From 045ae13e060c3717c921097444d5c6b09925e87c Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Tue, 13 May 2025 18:49:49 +0100 Subject: [PATCH] pageserver: make imports work with tenant shut downs (#11855) ## Problem Lifetime of imported timelines (and implicitly the import background task) has some shortcomings: 1. Timeline activation upon import completion is tricky. Previously, a timeline that finished importing after a tenant detach would not get activated and there's concerns about the safety of activating concurrently with shut-down. 2. Import jobs can prevent tenant shut down since they hold the tenant gate ## Summary of Changes Track the import tasks in memory and abort them explicitly on tenant shutdown. Integrate more closely with the storage controller: 1. When an import task has finished all of its jobs, it notifies the storage controller, but **does not** mark the import as done in the index_part. When all shards have finished importing, the storage controller will call the `/activate_post_import` idempotent endpoint for all of them. The handler, marks the import complete in index part, resets the tenant if required and checks if the timeline is active yet. 2. Not directly related, but the import job now gets the starting state from the storage controller instead of the import bucket. This paves the way for progress checkpointing. Related: https://github.com/neondatabase/neon/issues/11568 --- pageserver/client/src/mgmt_api.rs | 22 ++ pageserver/src/controller_upcall_client.rs | 40 +++ pageserver/src/deletion_queue.rs | 9 + pageserver/src/http/routes.rs | 105 ++++++ pageserver/src/tenant.rs | 222 ++++++------ .../src/tenant/remote_timeline_client.rs | 29 ++ .../src/tenant/timeline/import_pgdata.rs | 284 +++++++-------- .../src/tenant/timeline/import_pgdata/flow.rs | 4 + .../import_pgdata/importbucket_client.rs | 25 -- .../import_pgdata/importbucket_format.rs | 6 - .../import_pgdata/index_part_format.rs | 8 + storage_controller/src/http.rs | 30 ++ storage_controller/src/pageserver_client.rs | 19 + storage_controller/src/persistence.rs | 33 ++ storage_controller/src/service.rs | 328 +++++++++++------- .../src/service/safekeeper_service.rs | 7 +- storage_controller/src/timeline_import.rs | 22 +- test_runner/regress/test_import_pgdata.py | 91 ++++- 18 files changed, 859 insertions(+), 425 deletions(-) diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index 4a87a91910..219e63c9d4 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::error::Error as _; +use std::time::Duration; use bytes::Bytes; use detach_ancestor::AncestorDetached; @@ -819,4 +820,25 @@ impl Client { .await .map(|resp| resp.status()) } + + pub async fn activate_post_import( + &self, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + activate_timeline_timeout: Duration, + ) -> Result { + let uri = format!( + "{}/v1/tenant/{}/timeline/{}/activate_post_import?timeline_activate_timeout_ms={}", + self.mgmt_api_endpoint, + tenant_shard_id, + timeline_id, + activate_timeline_timeout.as_millis() + ); + + self.request(Method::PUT, uri, ()) + .await? + .json() + .await + .map_err(Error::ReceiveBody) + } } diff --git a/pageserver/src/controller_upcall_client.rs b/pageserver/src/controller_upcall_client.rs index 468e5463b0..6d186b091a 100644 --- a/pageserver/src/controller_upcall_client.rs +++ b/pageserver/src/controller_upcall_client.rs @@ -53,6 +53,11 @@ pub trait StorageControllerUpcallApi { timeline_id: TimelineId, status: ShardImportStatus, ) -> impl Future> + Send; + fn get_timeline_import_status( + &self, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + ) -> impl Future, RetryForeverError>> + Send; } impl StorageControllerUpcallClient { @@ -302,4 +307,39 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient { self.retry_http_forever(&url, request).await } + + #[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context + async fn get_timeline_import_status( + &self, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + ) -> Result, RetryForeverError> { + let url = self + .base_url + .join(format!("timeline_import_status/{}/{}", tenant_shard_id, timeline_id).as_str()) + .expect("Failed to build path"); + + Ok(backoff::retry( + || async { + let response = self.http_client.get(url.clone()).send().await?; + + if let Err(err) = response.error_for_status_ref() { + if matches!(err.status(), Some(reqwest::StatusCode::NOT_FOUND)) { + return Ok(None); + } else { + return Err(err); + } + } + response.json::().await.map(Some) + }, + |_| false, + 3, + u32::MAX, + "storage controller upcall", + &self.cancel, + ) + .await + .ok_or(RetryForeverError::ShuttingDown)? + .expect("We retry forever, this should never be reached")) + } } diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 4d62bc4ab5..65b2de28cd 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -663,6 +663,7 @@ mod test { use camino::Utf8Path; use hex_literal::hex; use pageserver_api::key::Key; + use pageserver_api::models::ShardImportStatus; use pageserver_api::shard::ShardIndex; use pageserver_api::upcall_api::ReAttachResponseTenant; use remote_storage::{RemoteStorageConfig, RemoteStorageKind}; @@ -796,6 +797,14 @@ mod test { ) -> Result<(), RetryForeverError> { unimplemented!() } + + async fn get_timeline_import_status( + &self, + _tenant_shard_id: TenantShardId, + _timeline_id: TimelineId, + ) -> Result, RetryForeverError> { + unimplemented!() + } } async fn setup(test_name: &str) -> anyhow::Result { diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 8b6500b020..2edec9dda1 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -3500,6 +3500,107 @@ async fn put_tenant_timeline_import_wal( }.instrument(span).await } +/// Activate a timeline after its import has completed +/// +/// The endpoint is idempotent and callers are expected to retry all +/// errors until a successful response. +async fn activate_post_import_handler( + request: Request, + _cancel: CancellationToken, +) -> Result, ApiError> { + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; + + let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; + const DEFAULT_ACTIVATE_TIMEOUT: Duration = Duration::from_secs(1); + let activate_timeout = parse_query_param(&request, "timeline_activate_timeout_ms")? + .map(Duration::from_millis) + .unwrap_or(DEFAULT_ACTIVATE_TIMEOUT); + + let span = info_span!( + "activate_post_import_handler", + tenant_id=%tenant_shard_id.tenant_id, + timeline_id=%timeline_id, + shard_id=%tenant_shard_id.shard_slug() + ); + + async move { + let state = get_state(&request); + let tenant = state + .tenant_manager + .get_attached_tenant_shard(tenant_shard_id)?; + + tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?; + + tenant + .finalize_importing_timeline(timeline_id) + .await + .map_err(ApiError::InternalServerError)?; + + match tenant.get_timeline(timeline_id, false) { + Ok(_timeline) => { + // Timeline is already visible. Reset not required: fall through. + } + Err(GetTimelineError::NotFound { .. }) => { + // This is crude: we reset the whole tenant such that the new timeline is detected + // and activated. We can come up with something more granular in the future. + // + // Note that we only reset the tenant if required: when the timeline is + // not present in [`Tenant::timelines`]. + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); + state + .tenant_manager + .reset_tenant(tenant_shard_id, false, &ctx) + .await + .map_err(ApiError::InternalServerError)?; + } + Err(GetTimelineError::ShuttingDown) => { + return Err(ApiError::ShuttingDown); + } + Err(GetTimelineError::NotActive { .. }) => { + unreachable!("Called get_timeline with active_only=false"); + } + } + + let timeline = tenant.get_timeline(timeline_id, false)?; + + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn) + .with_scope_timeline(&timeline); + + let result = + tokio::time::timeout(activate_timeout, timeline.wait_to_become_active(&ctx)).await; + match result { + Ok(Ok(())) => { + // fallthrough + } + // Timeline reached some other state that's not active + // TODO(vlad): if the tenant is broken, return a permananet error + Ok(Err(_timeline_state)) => { + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "Timeline activation failed" + ))); + } + // Activation timed out + Err(_) => { + return Err(ApiError::Timeout("Timeline activation timed out".into())); + } + } + + let timeline_info = build_timeline_info( + &timeline, false, // include_non_incremental_logical_size, + false, // force_await_initial_logical_size + &ctx, + ) + .await + .context("get local timeline info") + .map_err(ApiError::InternalServerError)?; + + json_response(StatusCode::OK, timeline_info) + } + .instrument(span) + .await +} + /// Read the end of a tar archive. /// /// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each. @@ -3924,5 +4025,9 @@ pub fn make_router( "/v1/tenant/:tenant_id/timeline/:timeline_id/import_wal", |r| api_handler(r, put_tenant_timeline_import_wal), ) + .put( + "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/activate_post_import", + |r| api_handler(r, activate_post_import_handler), + ) .any(handler_404)) } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index e59db74479..441049f47d 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -50,6 +50,7 @@ use remote_timeline_client::{ use secondary::heatmap::{HeatMapTenant, HeatMapTimeline}; use storage_broker::BrokerClientChannel; use timeline::compaction::{CompactionOutcome, GcCompactionQueue}; +use timeline::import_pgdata::ImportingTimeline; use timeline::offload::{OffloadError, offload_timeline}; use timeline::{ CompactFlags, CompactOptions, CompactionError, PreviousHeatmap, ShutdownMode, import_pgdata, @@ -284,6 +285,19 @@ pub struct TenantShard { /// **Lock order**: if acquiring all (or a subset), acquire them in order `timelines`, `timelines_offloaded`, `timelines_creating` timelines_offloaded: Mutex>>, + /// Tracks the timelines that are currently importing into this tenant shard. + /// + /// Note that importing timelines are also present in [`Self::timelines_creating`]. + /// Keep this in mind when ordering lock acquisition. + /// + /// Lifetime: + /// * An imported timeline is created while scanning the bucket on tenant attach + /// if the index part contains an `import_pgdata` entry and said field marks the import + /// as in progress. + /// * Imported timelines are removed when the storage controller calls the post timeline + /// import activation endpoint. + timelines_importing: std::sync::Mutex>, + /// The last tenant manifest known to be in remote storage. None if the manifest has not yet /// been either downloaded or uploaded. Always Some after tenant attach. /// @@ -923,19 +937,10 @@ enum StartCreatingTimelineResult { #[allow(clippy::large_enum_variant, reason = "TODO")] enum TimelineInitAndSyncResult { - ReadyToActivate(Arc), + ReadyToActivate, NeedsSpawnImportPgdata(TimelineInitAndSyncNeedsSpawnImportPgdata), } -impl TimelineInitAndSyncResult { - fn ready_to_activate(self) -> Option> { - match self { - Self::ReadyToActivate(timeline) => Some(timeline), - _ => None, - } - } -} - #[must_use] struct TimelineInitAndSyncNeedsSpawnImportPgdata { timeline: Arc, @@ -1012,10 +1017,6 @@ enum CreateTimelineCause { enum LoadTimelineCause { Attach, Unoffload, - ImportPgdata { - create_guard: TimelineCreateGuard, - activate: ActivateTimelineArgs, - }, } #[derive(thiserror::Error, Debug)] @@ -1097,7 +1098,7 @@ impl TenantShard { self: &Arc, timeline_id: TimelineId, resources: TimelineResources, - mut index_part: IndexPart, + index_part: IndexPart, metadata: TimelineMetadata, previous_heatmap: Option, ancestor: Option>, @@ -1106,7 +1107,7 @@ impl TenantShard { ) -> anyhow::Result { let tenant_id = self.tenant_shard_id; - let import_pgdata = index_part.import_pgdata.take(); + let import_pgdata = index_part.import_pgdata.clone(); let idempotency = match &import_pgdata { Some(import_pgdata) => { CreateTimelineIdempotency::ImportPgdata(CreatingTimelineIdempotencyImportPgdata { @@ -1127,7 +1128,7 @@ impl TenantShard { } }; - let (timeline, timeline_ctx) = self.create_timeline_struct( + let (timeline, _timeline_ctx) = self.create_timeline_struct( timeline_id, &metadata, previous_heatmap, @@ -1197,14 +1198,6 @@ impl TenantShard { match import_pgdata { Some(import_pgdata) if !import_pgdata.is_done() => { - match cause { - LoadTimelineCause::Attach | LoadTimelineCause::Unoffload => (), - LoadTimelineCause::ImportPgdata { .. } => { - unreachable!( - "ImportPgdata should not be reloading timeline import is done and persisted as such in s3" - ) - } - } let mut guard = self.timelines_creating.lock().unwrap(); if !guard.insert(timeline_id) { // We should never try and load the same timeline twice during startup @@ -1260,26 +1253,7 @@ impl TenantShard { "Timeline has no ancestor and no layer files" ); - match cause { - LoadTimelineCause::Attach | LoadTimelineCause::Unoffload => (), - LoadTimelineCause::ImportPgdata { - create_guard, - activate, - } => { - // TODO: see the comment in the task code above how I'm not so certain - // it is safe to activate here because of concurrent shutdowns. - match activate { - ActivateTimelineArgs::Yes { broker_client } => { - info!("activating timeline after reload from pgdata import task"); - timeline.activate(self.clone(), broker_client, None, &timeline_ctx); - } - ActivateTimelineArgs::No => (), - } - drop(create_guard); - } - } - - Ok(TimelineInitAndSyncResult::ReadyToActivate(timeline)) + Ok(TimelineInitAndSyncResult::ReadyToActivate) } } } @@ -1768,7 +1742,7 @@ impl TenantShard { })?; match effect { - TimelineInitAndSyncResult::ReadyToActivate(_) => { + TimelineInitAndSyncResult::ReadyToActivate => { // activation happens later, on Tenant::activate } TimelineInitAndSyncResult::NeedsSpawnImportPgdata( @@ -1778,13 +1752,24 @@ impl TenantShard { guard, }, ) => { - tokio::task::spawn(self.clone().create_timeline_import_pgdata_task( - timeline, - import_pgdata, - ActivateTimelineArgs::No, - guard, - ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Warn), - )); + let timeline_id = timeline.timeline_id; + let import_task_handle = + tokio::task::spawn(self.clone().create_timeline_import_pgdata_task( + timeline.clone(), + import_pgdata, + guard, + ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Warn), + )); + + let prev = self.timelines_importing.lock().unwrap().insert( + timeline_id, + ImportingTimeline { + timeline: timeline.clone(), + import_task_handle, + }, + ); + + assert!(prev.is_none()); } } } @@ -2678,14 +2663,7 @@ impl TenantShard { .await? } CreateTimelineParams::ImportPgdata(params) => { - self.create_timeline_import_pgdata( - params, - ActivateTimelineArgs::Yes { - broker_client: broker_client.clone(), - }, - ctx, - ) - .await? + self.create_timeline_import_pgdata(params, ctx).await? } }; @@ -2759,7 +2737,6 @@ impl TenantShard { async fn create_timeline_import_pgdata( self: &Arc, params: CreateTimelineParamsImportPgdata, - activate: ActivateTimelineArgs, ctx: &RequestContext, ) -> Result { let CreateTimelineParamsImportPgdata { @@ -2840,24 +2817,71 @@ impl TenantShard { let (timeline, timeline_create_guard) = uninit_timeline.finish_creation_myself(); - tokio::spawn(self.clone().create_timeline_import_pgdata_task( + let import_task_handle = tokio::spawn(self.clone().create_timeline_import_pgdata_task( timeline.clone(), index_part, - activate, timeline_create_guard, timeline_ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Warn), )); + let prev = self.timelines_importing.lock().unwrap().insert( + timeline.timeline_id, + ImportingTimeline { + timeline: timeline.clone(), + import_task_handle, + }, + ); + + // Idempotency is enforced higher up the stack + assert!(prev.is_none()); + // NB: the timeline doesn't exist in self.timelines at this point Ok(CreateTimelineResult::ImportSpawned(timeline)) } + /// Finalize the import of a timeline on this shard by marking it complete in + /// the index part. If the import task hasn't finished yet, returns an error. + /// + /// This method is idempotent. If the import was finalized once, the next call + /// will be a no-op. + pub(crate) async fn finalize_importing_timeline( + &self, + timeline_id: TimelineId, + ) -> anyhow::Result<()> { + let timeline = { + let locked = self.timelines_importing.lock().unwrap(); + match locked.get(&timeline_id) { + Some(importing_timeline) => { + if !importing_timeline.import_task_handle.is_finished() { + return Err(anyhow::anyhow!("Import task not done yet")); + } + + importing_timeline.timeline.clone() + } + None => { + return Ok(()); + } + } + }; + + timeline + .remote_client + .schedule_index_upload_for_import_pgdata_finalize()?; + timeline.remote_client.wait_completion().await?; + + self.timelines_importing + .lock() + .unwrap() + .remove(&timeline_id); + + Ok(()) + } + #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id))] async fn create_timeline_import_pgdata_task( self: Arc, timeline: Arc, index_part: import_pgdata::index_part_format::Root, - activate: ActivateTimelineArgs, timeline_create_guard: TimelineCreateGuard, ctx: RequestContext, ) { @@ -2869,7 +2893,6 @@ impl TenantShard { .create_timeline_import_pgdata_task_impl( timeline, index_part, - activate, timeline_create_guard, ctx, ) @@ -2885,60 +2908,15 @@ impl TenantShard { self: Arc, timeline: Arc, index_part: import_pgdata::index_part_format::Root, - activate: ActivateTimelineArgs, - timeline_create_guard: TimelineCreateGuard, + _timeline_create_guard: TimelineCreateGuard, ctx: RequestContext, ) -> Result<(), anyhow::Error> { info!("importing pgdata"); + let ctx = ctx.with_scope_timeline(&timeline); import_pgdata::doit(&timeline, index_part, &ctx, self.cancel.clone()) .await .context("import")?; - info!("import done"); - - // - // Reload timeline from remote. - // This proves that the remote state is attachable, and it reuses the code. - // - // TODO: think about whether this is safe to do with concurrent TenantShard::shutdown. - // timeline_create_guard hols the tenant gate open, so, shutdown cannot _complete_ until we exit. - // But our activate() call might launch new background tasks after TenantShard::shutdown - // already went past shutting down the TenantShard::timelines, which this timeline here is no part of. - // I think the same problem exists with the bootstrap & branch mgmt API tasks (tenant shutting - // down while bootstrapping/branching + activating), but, the race condition is much more likely - // to manifest because of the long runtime of this import task. - - // in theory this shouldn't even .await anything except for coop yield - info!("shutting down timeline"); - timeline.shutdown(ShutdownMode::Hard).await; - info!("timeline shut down, reloading from remote"); - // TODO: we can't do the following check because create_timeline_import_pgdata must return an Arc - // let Some(timeline) = Arc::into_inner(timeline) else { - // anyhow::bail!("implementation error: timeline that we shut down was still referenced from somewhere"); - // }; - let timeline_id = timeline.timeline_id; - - // load from object storage like TenantShard::attach does - let resources = self.build_timeline_resources(timeline_id); - let index_part = resources - .remote_client - .download_index_file(&self.cancel) - .await?; - let index_part = match index_part { - MaybeDeletedIndexPart::Deleted(_) => { - // likely concurrent delete call, cplane should prevent this - anyhow::bail!( - "index part says deleted but we are not done creating yet, this should not happen but" - ) - } - MaybeDeletedIndexPart::IndexPart(p) => p, - }; - let metadata = index_part.metadata.clone(); - self - .load_remote_timeline(timeline_id, index_part, metadata, None, resources, LoadTimelineCause::ImportPgdata{ - create_guard: timeline_create_guard, activate, }, &ctx) - .await? - .ready_to_activate() - .context("implementation error: reloaded timeline still needs import after import reported success")?; + info!("import done - waiting for activation"); anyhow::Ok(()) } @@ -3475,6 +3453,14 @@ impl TenantShard { timeline.defuse_for_tenant_drop(); }); } + { + let mut timelines_importing = self.timelines_importing.lock().unwrap(); + timelines_importing + .drain() + .for_each(|(_timeline_id, importing_timeline)| { + importing_timeline.shutdown(); + }); + } // test_long_timeline_create_then_tenant_delete is leaning on this message tracing::info!("Waiting for timelines..."); while let Some(res) = js.join_next().await { @@ -3949,13 +3935,6 @@ where Ok(result) } -enum ActivateTimelineArgs { - Yes { - broker_client: storage_broker::BrokerClientChannel, - }, - No, -} - impl TenantShard { pub fn tenant_specific_overrides(&self) -> pageserver_api::models::TenantConfig { self.tenant_conf.load().tenant_conf.clone() @@ -4322,6 +4301,7 @@ impl TenantShard { timelines: Mutex::new(HashMap::new()), timelines_creating: Mutex::new(HashSet::new()), timelines_offloaded: Mutex::new(HashMap::new()), + timelines_importing: Mutex::new(HashMap::new()), remote_tenant_manifest: Default::default(), gc_cs: tokio::sync::Mutex::new(()), walredo_mgr, diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index ea29f51956..21d68495f7 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -949,6 +949,35 @@ impl RemoteTimelineClient { Ok(()) } + /// If the `import_pgdata` field marks the timeline as having an import in progress, + /// launch an index-file upload operation that transitions it to done in the background + pub(crate) fn schedule_index_upload_for_import_pgdata_finalize( + self: &Arc, + ) -> anyhow::Result<()> { + use import_pgdata::index_part_format; + + let mut guard = self.upload_queue.lock().unwrap(); + let upload_queue = guard.initialized_mut()?; + let to_update = match &upload_queue.dirty.import_pgdata { + Some(import) if !import.is_done() => Some(import), + Some(_) | None => None, + }; + + if let Some(old) = to_update { + let new = + index_part_format::Root::V1(index_part_format::V1::Done(index_part_format::Done { + idempotency_key: old.idempotency_key().clone(), + started_at: *old.started_at(), + finished_at: chrono::Utc::now().naive_utc(), + })); + + upload_queue.dirty.import_pgdata = Some(new); + self.schedule_index_upload(upload_queue); + } + + Ok(()) + } + /// Launch an index-file upload operation in the background, setting `gc_compaction_state` field. pub(crate) fn schedule_index_upload_for_gc_compaction_state_update( self: &Arc, diff --git a/pageserver/src/tenant/timeline/import_pgdata.rs b/pageserver/src/tenant/timeline/import_pgdata.rs index c4a8df39a3..53e15e5395 100644 --- a/pageserver/src/tenant/timeline/import_pgdata.rs +++ b/pageserver/src/tenant/timeline/import_pgdata.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use anyhow::{Context, bail}; use pageserver_api::models::ShardImportStatus; use remote_storage::RemotePath; +use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tracing::info; use utils::lsn::Lsn; @@ -17,6 +18,17 @@ mod importbucket_client; mod importbucket_format; pub(crate) mod index_part_format; +pub(crate) struct ImportingTimeline { + pub import_task_handle: JoinHandle<()>, + pub timeline: Arc, +} + +impl ImportingTimeline { + pub(crate) fn shutdown(self) { + self.import_task_handle.abort(); + } +} + pub async fn doit( timeline: &Arc, index_part: index_part_format::Root, @@ -26,173 +38,161 @@ pub async fn doit( let index_part_format::Root::V1(v1) = index_part; let index_part_format::InProgress { location, - idempotency_key, - started_at, + idempotency_key: _, + started_at: _, } = match v1 { index_part_format::V1::Done(_) => return Ok(()), index_part_format::V1::InProgress(in_progress) => in_progress, }; - let storage = importbucket_client::new(timeline.conf, &location, cancel.clone()).await?; + let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel); - let status_prefix = RemotePath::from_string("status").unwrap(); + let shard_status = storcon_client + .get_timeline_import_status(timeline.tenant_shard_id, timeline.timeline_id) + .await + .map_err(|_err| anyhow::anyhow!("Shut down while getting timeline import status"))?; - // - // See if shard is done. - // TODO: incorporate generations into status key for split brain safety. Figure out together with checkpointing. - // - let shard_status_key = - status_prefix.join(format!("shard-{}", timeline.tenant_shard_id.shard_slug())); - let shard_status: Option = - storage.get_json(&shard_status_key).await?; info!(?shard_status, "peeking shard status"); - if shard_status.map(|st| st.done).unwrap_or(false) { - info!("shard status indicates that the shard is done, skipping import"); - } else { - // TODO: checkpoint the progress into the IndexPart instead of restarting - // from the beginning. + match shard_status { + None | Some(ShardImportStatus::InProgress) => { + // TODO: checkpoint the progress into the IndexPart instead of restarting + // from the beginning. - // - // Wipe the slate clean - the flow does not allow resuming. - // We can implement resuming in the future by checkpointing the progress into the IndexPart. - // - info!("wipe the slate clean"); - { - // TODO: do we need to hold GC lock for this? - let mut guard = timeline.layers.write().await; - assert!( - guard.layer_map()?.open_layer.is_none(), - "while importing, there should be no in-memory layer" // this just seems like a good place to assert it - ); - let all_layers_keys = guard.all_persistent_layers(); - let all_layers: Vec<_> = all_layers_keys - .iter() - .map(|key| guard.get_from_key(key)) - .collect(); - let open = guard.open_mut().context("open_mut")?; + // + // Wipe the slate clean - the flow does not allow resuming. + // We can implement resuming in the future by checkpointing the progress into the IndexPart. + // + info!("wipe the slate clean"); + { + // TODO: do we need to hold GC lock for this? + let mut guard = timeline.layers.write().await; + assert!( + guard.layer_map()?.open_layer.is_none(), + "while importing, there should be no in-memory layer" // this just seems like a good place to assert it + ); + let all_layers_keys = guard.all_persistent_layers(); + let all_layers: Vec<_> = all_layers_keys + .iter() + .map(|key| guard.get_from_key(key)) + .collect(); + let open = guard.open_mut().context("open_mut")?; - timeline.remote_client.schedule_gc_update(&all_layers)?; - open.finish_gc_timeline(&all_layers); - } - - // - // Wait for pgdata to finish uploading - // - info!("wait for pgdata to reach status 'done'"); - let pgdata_status_key = status_prefix.join("pgdata"); - loop { - let res = async { - let pgdata_status: Option = storage - .get_json(&pgdata_status_key) - .await - .context("get pgdata status")?; - info!(?pgdata_status, "peeking pgdata status"); - if pgdata_status.map(|st| st.done).unwrap_or(false) { - Ok(()) - } else { - Err(anyhow::anyhow!("pgdata not done yet")) - } + timeline.remote_client.schedule_gc_update(&all_layers)?; + open.finish_gc_timeline(&all_layers); } - .await; - match res { - Ok(_) => break, - Err(err) => { - info!(?err, "indefinitely waiting for pgdata to finish"); - if tokio::time::timeout(std::time::Duration::from_secs(10), cancel.cancelled()) + + // + // Wait for pgdata to finish uploading + // + info!("wait for pgdata to reach status 'done'"); + let storage = + importbucket_client::new(timeline.conf, &location, cancel.clone()).await?; + let status_prefix = RemotePath::from_string("status").unwrap(); + let pgdata_status_key = status_prefix.join("pgdata"); + loop { + let res = async { + let pgdata_status: Option = storage + .get_json(&pgdata_status_key) + .await + .context("get pgdata status")?; + info!(?pgdata_status, "peeking pgdata status"); + if pgdata_status.map(|st| st.done).unwrap_or(false) { + Ok(()) + } else { + Err(anyhow::anyhow!("pgdata not done yet")) + } + } + .await; + match res { + Ok(_) => break, + Err(err) => { + info!(?err, "indefinitely waiting for pgdata to finish"); + if tokio::time::timeout( + std::time::Duration::from_secs(10), + cancel.cancelled(), + ) .await .is_ok() - { - bail!("cancelled while waiting for pgdata"); + { + bail!("cancelled while waiting for pgdata"); + } } } } - } - // - // Do the import - // - info!("do the import"); - let control_file = storage.get_control_file().await?; - let base_lsn = control_file.base_lsn(); + // + // Do the import + // + info!("do the import"); + let control_file = storage.get_control_file().await?; + let base_lsn = control_file.base_lsn(); - info!("update TimelineMetadata based on LSNs from control file"); - { - let pg_version = control_file.pg_version(); - let _ctx: &RequestContext = ctx; - async move { - // FIXME: The 'disk_consistent_lsn' should be the LSN at the *end* of the - // checkpoint record, and prev_record_lsn should point to its beginning. - // We should read the real end of the record from the WAL, but here we - // just fake it. - let disk_consistent_lsn = Lsn(base_lsn.0 + 8); - let prev_record_lsn = base_lsn; - let metadata = TimelineMetadata::new( - disk_consistent_lsn, - Some(prev_record_lsn), - None, // no ancestor - Lsn(0), // no ancestor lsn - base_lsn, // latest_gc_cutoff_lsn - base_lsn, // initdb_lsn - pg_version, - ); + info!("update TimelineMetadata based on LSNs from control file"); + { + let pg_version = control_file.pg_version(); + let _ctx: &RequestContext = ctx; + async move { + // FIXME: The 'disk_consistent_lsn' should be the LSN at the *end* of the + // checkpoint record, and prev_record_lsn should point to its beginning. + // We should read the real end of the record from the WAL, but here we + // just fake it. + let disk_consistent_lsn = Lsn(base_lsn.0 + 8); + let prev_record_lsn = base_lsn; + let metadata = TimelineMetadata::new( + disk_consistent_lsn, + Some(prev_record_lsn), + None, // no ancestor + Lsn(0), // no ancestor lsn + base_lsn, // latest_gc_cutoff_lsn + base_lsn, // initdb_lsn + pg_version, + ); - let _start_lsn = disk_consistent_lsn + 1; + let _start_lsn = disk_consistent_lsn + 1; - timeline - .remote_client - .schedule_index_upload_for_full_metadata_update(&metadata)?; + timeline + .remote_client + .schedule_index_upload_for_full_metadata_update(&metadata)?; - timeline.remote_client.wait_completion().await?; + timeline.remote_client.wait_completion().await?; - anyhow::Ok(()) + anyhow::Ok(()) + } } + .await?; + + flow::run(timeline.clone(), control_file, storage.clone(), ctx).await?; + + // Communicate that shard is done. + // Ensure at-least-once delivery of the upcall to storage controller + // before we mark the task as done and never come here again. + // + // Note that we do not mark the import complete in the index part now. + // This happens in [`Tenant::finalize_importing_timeline`] in response + // to the storage controller calling + // `/v1/tenant/:tenant_id/timeline/:timeline_id/activate_post_import`. + storcon_client + .put_timeline_import_status( + timeline.tenant_shard_id, + timeline.timeline_id, + // TODO(vlad): What about import errors? + ShardImportStatus::Done, + ) + .await + .map_err(|_err| { + anyhow::anyhow!("Shut down while putting timeline import status") + })?; + } + Some(ShardImportStatus::Error(err)) => { + info!( + "shard status indicates that the shard is done (error), skipping import {}", + err + ); + } + Some(ShardImportStatus::Done) => { + info!("shard status indicates that the shard is done (success), skipping import"); } - .await?; - - flow::run(timeline.clone(), control_file, storage.clone(), ctx).await?; - - // - // Communicate that shard is done. - // Ensure at-least-once delivery of the upcall to storage controller - // before we mark the task as done and never come here again. - // - let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel); - storcon_client - .put_timeline_import_status( - timeline.tenant_shard_id, - timeline.timeline_id, - // TODO(vlad): What about import errors? - ShardImportStatus::Done, - ) - .await - .map_err(|_err| anyhow::anyhow!("Shut down while putting timeline import status"))?; - - storage - .put_json( - &shard_status_key, - &importbucket_format::ShardStatus { done: true }, - ) - .await - .context("put shard status")?; } - // - // Mark as done in index_part. - // This makes subsequent timeline loads enter the normal load code path - // instead of spawning the import task and calling this here function. - // - info!("mark import as complete in index part"); - timeline - .remote_client - .schedule_index_upload_for_import_pgdata_state_update(Some(index_part_format::Root::V1( - index_part_format::V1::Done(index_part_format::Done { - idempotency_key, - started_at, - finished_at: chrono::Utc::now().naive_utc(), - }), - )))?; - - timeline.remote_client.wait_completion().await?; - Ok(()) } diff --git a/pageserver/src/tenant/timeline/import_pgdata/flow.rs b/pageserver/src/tenant/timeline/import_pgdata/flow.rs index 34c073365d..5b9c8ec5b5 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/flow.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/flow.rs @@ -53,6 +53,7 @@ use tokio_stream::StreamExt; use tracing::{debug, instrument}; use utils::bin_ser::BeSer; use utils::lsn::Lsn; +use utils::pausable_failpoint; use super::Timeline; use super::importbucket_client::{ControlFile, RemoteStorageWrapper}; @@ -79,6 +80,9 @@ pub async fn run( let import_config = &timeline.conf.timeline_import_config; let plan = planner.plan(import_config).await?; + + pausable_failpoint!("import-timeline-pre-execute-pausable"); + plan.execute(timeline, import_config, ctx).await } diff --git a/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs b/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs index e7aa8f6038..34313748b7 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs @@ -190,31 +190,6 @@ impl RemoteStorageWrapper { Ok(Some(res)) } - #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))] - pub async fn put_json(&self, path: &RemotePath, value: &T) -> anyhow::Result<()> - where - T: serde::Serialize, - { - let buf = serde_json::to_vec(value)?; - let bytes = Bytes::from(buf); - utils::backoff::retry( - || async { - let size = bytes.len(); - let bytes = futures::stream::once(futures::future::ready(Ok(bytes.clone()))); - self.storage - .upload_storage_object(bytes, size, path, &self.cancel) - .await - }, - remote_storage::TimeoutOrCancel::caused_by_cancel, - 1, - u32::MAX, - &format!("put json {path}"), - &self.cancel, - ) - .await - .expect("practically infinite retries") - } - #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))] pub async fn get_range( &self, diff --git a/pageserver/src/tenant/timeline/import_pgdata/importbucket_format.rs b/pageserver/src/tenant/timeline/import_pgdata/importbucket_format.rs index 57c647cc7f..d9f4da4748 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/importbucket_format.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/importbucket_format.rs @@ -5,9 +5,3 @@ pub struct PgdataStatus { pub done: bool, // TODO: remaining fields } - -#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)] -pub struct ShardStatus { - pub done: bool, - // TODO: remaining fields -} diff --git a/pageserver/src/tenant/timeline/import_pgdata/index_part_format.rs b/pageserver/src/tenant/timeline/import_pgdata/index_part_format.rs index ea7a41b25f..371fc857dc 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/index_part_format.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/index_part_format.rs @@ -64,4 +64,12 @@ impl Root { }, } } + pub fn started_at(&self) -> &chrono::NaiveDateTime { + match self { + Root::V1(v1) => match v1 { + V1::InProgress(in_progress) => &in_progress.started_at, + V1::Done(done) => &done.started_at, + }, + } + } } diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index 649113b8ce..8d459cab9c 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -157,6 +157,29 @@ async fn handle_validate(req: Request) -> Result, ApiError> json_response(StatusCode::OK, state.service.validate(validate_req).await?) } +async fn handle_get_timeline_import_status(req: Request) -> Result, ApiError> { + check_permissions(&req, Scope::GenerationsApi)?; + + let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?; + let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?; + + let req = match maybe_forward(req).await { + ForwardOutcome::Forwarded(res) => { + return res; + } + ForwardOutcome::NotForwarded(req) => req, + }; + + let state = get_state(&req); + json_response( + StatusCode::OK, + state + .service + .handle_timeline_shard_import_progress(tenant_shard_id, timeline_id) + .await?, + ) +} + async fn handle_put_timeline_import_status(req: Request) -> Result, ApiError> { check_permissions(&req, Scope::GenerationsApi)?; @@ -2008,6 +2031,13 @@ pub fn make_router( .post("/upcall/v1/validate", |r| { named_request_span(r, handle_validate, RequestName("upcall_v1_validate")) }) + .get("/upcall/v1/timeline_import_status", |r| { + named_request_span( + r, + handle_get_timeline_import_status, + RequestName("upcall_v1_timeline_import_status"), + ) + }) .post("/upcall/v1/timeline_import_status", |r| { named_request_span( r, diff --git a/storage_controller/src/pageserver_client.rs b/storage_controller/src/pageserver_client.rs index 554ca375f5..817409e112 100644 --- a/storage_controller/src/pageserver_client.rs +++ b/storage_controller/src/pageserver_client.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use pageserver_api::models::detach_ancestor::AncestorDetached; use pageserver_api::models::{ DetachBehavior, LocationConfig, LocationConfigListResponse, LsnLease, PageserverUtilization, @@ -212,6 +214,7 @@ impl PageserverClient { ) } + #[allow(unused)] pub(crate) async fn timeline_detail( &self, tenant_shard_id: TenantShardId, @@ -357,4 +360,20 @@ impl PageserverClient { self.inner.wait_lsn(tenant_shard_id, request).await ) } + + pub(crate) async fn activate_post_import( + &self, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + timeline_activate_timeout: Duration, + ) -> Result { + measured_request!( + "activate_post_import", + crate::metrics::Method::Put, + &self.node_id_label, + self.inner + .activate_post_import(tenant_shard_id, timeline_id, timeline_activate_timeout) + .await + ) + } } diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 9ffcf9b9e6..052c0f02eb 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -1666,6 +1666,39 @@ impl Persistence { } } + pub(crate) async fn get_timeline_import( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> DatabaseResult> { + use crate::schema::timeline_imports::dsl; + let persistent_import = self + .with_measured_conn(DatabaseOperation::ListTimelineImports, move |conn| { + Box::pin(async move { + let mut from_db: Vec = dsl::timeline_imports + .filter(dsl::tenant_id.eq(tenant_id.to_string())) + .filter(dsl::timeline_id.eq(timeline_id.to_string())) + .load(conn) + .await?; + + if from_db.len() > 1 { + return Err(DatabaseError::Logical(format!( + "unexpected number of rows ({})", + from_db.len() + ))); + } + + Ok(from_db.pop()) + }) + }) + .await?; + + persistent_import + .map(TimelineImport::from_persistent) + .transpose() + .map_err(|err| DatabaseError::Logical(format!("failed to deserialize import: {err}"))) + } + pub(crate) async fn delete_timeline_import( &self, tenant_id: TenantId, diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 193050460d..05430733c2 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -35,12 +35,12 @@ use pageserver_api::controller_api::{ }; use pageserver_api::models::{ self, DetachBehavior, LocationConfig, LocationConfigListResponse, LocationConfigMode, LsnLease, - PageserverUtilization, SecondaryProgress, ShardParameters, TenantConfig, + PageserverUtilization, SecondaryProgress, ShardImportStatus, ShardParameters, TenantConfig, TenantConfigPatchRequest, TenantConfigRequest, TenantLocationConfigRequest, TenantLocationConfigResponse, TenantShardLocation, TenantShardSplitRequest, TenantShardSplitResponse, TenantSorting, TenantTimeTravelRequest, TimelineArchivalConfigRequest, TimelineCreateRequest, TimelineCreateResponseStorcon, - TimelineInfo, TimelineState, TopTenantShardItem, TopTenantShardsRequest, + TimelineInfo, TopTenantShardItem, TopTenantShardsRequest, }; use pageserver_api::shard::{ DEFAULT_STRIPE_SIZE, ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId, @@ -61,6 +61,7 @@ use utils::completion::Barrier; use utils::generation::Generation; use utils::id::{NodeId, TenantId, TimelineId}; use utils::lsn::Lsn; +use utils::shard::ShardIndex; use utils::sync::gate::{Gate, GateGuard}; use utils::{failpoint_support, pausable_failpoint}; @@ -98,7 +99,8 @@ use crate::tenant_shard::{ ScheduleOptimization, ScheduleOptimizationAction, TenantShard, }; use crate::timeline_import::{ - ShardImportStatuses, TimelineImport, TimelineImportState, UpcallClient, + ImportResult, ShardImportStatuses, TimelineImport, TimelineImportFinalizeError, + TimelineImportState, UpcallClient, }; const WAITER_FILL_DRAIN_POLL_TIMEOUT: Duration = Duration::from_millis(500); @@ -3905,6 +3907,38 @@ impl Service { }) } + pub(crate) async fn handle_timeline_shard_import_progress( + self: &Arc, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + ) -> Result { + let maybe_import = self + .persistence + .get_timeline_import(tenant_shard_id.tenant_id, timeline_id) + .await?; + + let import = maybe_import.ok_or_else(|| { + ApiError::NotFound( + format!( + "import for {}/{} not found", + tenant_shard_id.tenant_id, timeline_id + ) + .into(), + ) + })?; + + import + .shard_statuses + .0 + .get(&tenant_shard_id.to_index()) + .cloned() + .ok_or_else(|| { + ApiError::NotFound( + format!("shard {} not found", tenant_shard_id.shard_slug()).into(), + ) + }) + } + pub(crate) async fn handle_timeline_shard_import_progress_upcall( self: &Arc, req: PutTimelineImportStatusRequest, @@ -3943,6 +3977,16 @@ impl Service { Ok(()) } + /// Finalize the import of a timeline + /// + /// This method should be called once all shards have reported that the import is complete. + /// Firstly, it polls the post import timeline activation endpoint exposed by the pageserver. + /// Once the timeline is active on all shards, the timeline also gets created on the + /// safekeepers. Finally, notify cplane of the import completion (whether failed or + /// successful), and remove the import from the database and in-memory. + /// + /// If this method gets pre-empted by shut down, it will be called again at start-up (on-going + /// imports are stored in the database). #[instrument(skip_all, fields( tenant_id=%import.tenant_id, shard_id=%import.timeline_id, @@ -3950,59 +3994,80 @@ impl Service { async fn finalize_timeline_import( self: &Arc, import: TimelineImport, - ) -> anyhow::Result<()> { + ) -> Result<(), TimelineImportFinalizeError> { tracing::info!("Finalizing timeline import"); pausable_failpoint!("timeline-import-pre-cplane-notification"); - let import_failed = import.completion_error().is_some(); + let tenant_id = import.tenant_id; + let timeline_id = import.timeline_id; - if !import_failed { - loop { - if self.cancel.is_cancelled() { - anyhow::bail!("Shut down requested while finalizing import"); - } - - let active = self.timeline_active_on_all_shards(&import).await?; - - match active { - Some(timeline_info) => { - tracing::info!("Timeline became active on all shards"); - - if self.config.timelines_onto_safekeepers { - // Now that we know the start LSN of this timeline, create it on the - // safekeepers. - self.tenant_timeline_create_safekeepers_until_success( - import.tenant_id, - timeline_info, - ) - .await?; - } - - break; - } - None => { - tracing::info!("Timeline not active on all shards yet"); - - tokio::select! { - _ = self.cancel.cancelled() => { - anyhow::bail!("Shut down requested while finalizing import"); - }, - _ = tokio::time::sleep(Duration::from_secs(5)) => {} - }; - } - } + let import_error = import.completion_error(); + match import_error { + Some(err) => { + self.notify_cplane_and_delete_import(tenant_id, timeline_id, Err(err)) + .await?; + tracing::warn!("Timeline import completed with shard errors"); + Ok(()) } - } + None => match self.activate_timeline_post_import(&import).await { + Ok(timeline_info) => { + tracing::info!("Post import timeline activation complete"); + if self.config.timelines_onto_safekeepers { + // Now that we know the start LSN of this timeline, create it on the + // safekeepers. + self.tenant_timeline_create_safekeepers_until_success( + import.tenant_id, + timeline_info, + ) + .await?; + } + + self.notify_cplane_and_delete_import(tenant_id, timeline_id, Ok(())) + .await?; + + tracing::info!("Timeline import completed successfully"); + Ok(()) + } + Err(TimelineImportFinalizeError::ShuttingDown) => { + // We got pre-empted by shut down and will resume after the restart. + Err(TimelineImportFinalizeError::ShuttingDown) + } + Err(err) => { + // Any finalize error apart from shut down is permanent and requires us to notify + // cplane such that it can clean up. + tracing::error!("Import finalize failed with permanent error: {err}"); + self.notify_cplane_and_delete_import( + tenant_id, + timeline_id, + Err(err.to_string()), + ) + .await?; + Err(err) + } + }, + } + } + + async fn notify_cplane_and_delete_import( + self: &Arc, + tenant_id: TenantId, + timeline_id: TimelineId, + import_result: ImportResult, + ) -> Result<(), TimelineImportFinalizeError> { + let import_failed = import_result.is_err(); tracing::info!(%import_failed, "Notifying cplane of import completion"); let client = UpcallClient::new(self.get_config(), self.cancel.child_token()); - client.notify_import_complete(&import).await?; + client + .notify_import_complete(tenant_id, timeline_id, import_result) + .await + .map_err(|_err| TimelineImportFinalizeError::ShuttingDown)?; if let Err(err) = self .persistence - .delete_timeline_import(import.tenant_id, import.timeline_id) + .delete_timeline_import(tenant_id, timeline_id) .await { tracing::warn!("Failed to delete timeline import entry from database: {err}"); @@ -4012,14 +4077,113 @@ impl Service { .write() .unwrap() .tenants - .range_mut(TenantShardId::tenant_range(import.tenant_id)) + .range_mut(TenantShardId::tenant_range(tenant_id)) .for_each(|(_id, shard)| shard.importing = TimelineImportState::Idle); - tracing::info!(%import_failed, "Timeline import complete"); - Ok(()) } + /// Activate an imported timeline on all shards once the import is complete. + /// Returns the [`TimelineInfo`] reported by shard zero. + async fn activate_timeline_post_import( + self: &Arc, + import: &TimelineImport, + ) -> Result { + const TIMELINE_ACTIVATE_TIMEOUT: Duration = Duration::from_millis(128); + + let mut shards_to_activate: HashSet = + import.shard_statuses.0.keys().cloned().collect(); + let mut shard_zero_timeline_info = None; + + while !shards_to_activate.is_empty() { + if self.cancel.is_cancelled() { + return Err(TimelineImportFinalizeError::ShuttingDown); + } + + let targets = { + let locked = self.inner.read().unwrap(); + let mut targets = Vec::new(); + + for (tenant_shard_id, shard) in locked + .tenants + .range(TenantShardId::tenant_range(import.tenant_id)) + { + if !import + .shard_statuses + .0 + .contains_key(&tenant_shard_id.to_index()) + { + return Err(TimelineImportFinalizeError::MismatchedShards( + tenant_shard_id.to_index(), + )); + } + + if let Some(node_id) = shard.intent.get_attached() { + let node = locked + .nodes + .get(node_id) + .expect("Pageservers may not be deleted while referenced"); + targets.push((*tenant_shard_id, node.clone())); + } + } + + targets + }; + + let targeted_tenant_shards: Vec<_> = targets.iter().map(|(tid, _node)| *tid).collect(); + + let results = self + .tenant_for_shards_api( + targets, + |tenant_shard_id, client| async move { + client + .activate_post_import( + tenant_shard_id, + import.timeline_id, + TIMELINE_ACTIVATE_TIMEOUT, + ) + .await + }, + 1, + 1, + SHORT_RECONCILE_TIMEOUT, + &self.cancel, + ) + .await; + + let mut failed = 0; + for (tid, result) in targeted_tenant_shards.iter().zip(results.into_iter()) { + match result { + Ok(ok) => { + if tid.is_shard_zero() { + shard_zero_timeline_info = Some(ok); + } + + shards_to_activate.remove(&tid.to_index()); + } + Err(_err) => { + failed += 1; + } + } + } + + if failed > 0 { + tracing::info!( + "Failed to activate timeline on {failed} shards post import. Will retry" + ); + } + + tokio::select! { + _ = tokio::time::sleep(Duration::from_millis(250)) => {}, + _ = self.cancel.cancelled() => { + return Err(TimelineImportFinalizeError::ShuttingDown); + } + } + } + + Ok(shard_zero_timeline_info.expect("All shards replied")) + } + async fn finalize_timeline_imports(self: &Arc, imports: Vec) { futures::future::join_all( imports @@ -4029,78 +4193,6 @@ impl Service { .await; } - /// If the timeline is active on all shards, returns the [`TimelineInfo`] - /// collected from shard 0. - /// - /// An error is returned if the shard layout has changed during the import. - /// This is guarded against within the storage controller and the pageserver, - /// and, therefore, unexpected. - async fn timeline_active_on_all_shards( - self: &Arc, - import: &TimelineImport, - ) -> anyhow::Result> { - let targets = { - let locked = self.inner.read().unwrap(); - let mut targets = Vec::new(); - - for (tenant_shard_id, shard) in locked - .tenants - .range(TenantShardId::tenant_range(import.tenant_id)) - { - if !import - .shard_statuses - .0 - .contains_key(&tenant_shard_id.to_index()) - { - anyhow::bail!("Shard layout change detected on completion"); - } - - if let Some(node_id) = shard.intent.get_attached() { - let node = locked - .nodes - .get(node_id) - .expect("Pageservers may not be deleted while referenced"); - targets.push((*tenant_shard_id, node.clone())); - } else { - return Ok(None); - } - } - - targets - }; - - if targets.is_empty() { - anyhow::bail!("No shards found to finalize import for"); - } - - let results = self - .tenant_for_shards_api( - targets, - |tenant_shard_id, client| async move { - client - .timeline_detail(tenant_shard_id, import.timeline_id) - .await - }, - 1, - 1, - SHORT_RECONCILE_TIMEOUT, - &self.cancel, - ) - .await; - - let all_active = results.iter().all(|res| match res { - Ok(info) => info.state == TimelineState::Active, - Err(_) => false, - }); - - if all_active { - // Both unwraps are validated above - Ok(Some(results.into_iter().next().unwrap().unwrap())) - } else { - Ok(None) - } - } - pub(crate) async fn tenant_timeline_archival_config( &self, tenant_id: TenantId, diff --git a/storage_controller/src/service/safekeeper_service.rs b/storage_controller/src/service/safekeeper_service.rs index 5c15660ba3..cd5ace449d 100644 --- a/storage_controller/src/service/safekeeper_service.rs +++ b/storage_controller/src/service/safekeeper_service.rs @@ -10,6 +10,7 @@ use crate::persistence::{ DatabaseError, SafekeeperTimelineOpKind, TimelinePendingOpPersistence, TimelinePersistence, }; use crate::safekeeper::Safekeeper; +use crate::timeline_import::TimelineImportFinalizeError; use anyhow::Context; use http_utils::error::ApiError; use pageserver_api::controller_api::{ @@ -327,12 +328,12 @@ impl Service { self: &Arc, tenant_id: TenantId, timeline_info: TimelineInfo, - ) -> anyhow::Result<()> { + ) -> Result<(), TimelineImportFinalizeError> { const BACKOFF: Duration = Duration::from_secs(5); loop { if self.cancel.is_cancelled() { - anyhow::bail!("Shut down requested while finalizing import"); + return Err(TimelineImportFinalizeError::ShuttingDown); } let res = self @@ -348,7 +349,7 @@ impl Service { tracing::error!("Failed to create timeline on safekeepers: {err}"); tokio::select! { _ = self.cancel.cancelled() => { - anyhow::bail!("Shut down requested while finalizing import"); + return Err(TimelineImportFinalizeError::ShuttingDown); }, _ = tokio::time::sleep(BACKOFF) => {} }; diff --git a/storage_controller/src/timeline_import.rs b/storage_controller/src/timeline_import.rs index 6dcc538c4b..5d9d633932 100644 --- a/storage_controller/src/timeline_import.rs +++ b/storage_controller/src/timeline_import.rs @@ -46,6 +46,14 @@ pub(crate) enum TimelineImportUpdateFollowUp { None, } +#[derive(thiserror::Error, Debug)] +pub(crate) enum TimelineImportFinalizeError { + #[error("Shut down interrupted import finalize")] + ShuttingDown, + #[error("Mismatched shard detected during import finalize: {0}")] + MismatchedShards(ShardIndex), +} + pub(crate) enum TimelineImportUpdateError { ImportNotFound { tenant_id: TenantId, @@ -151,6 +159,8 @@ impl TimelineImport { } } +pub(crate) type ImportResult = Result<(), String>; + pub(crate) struct UpcallClient { authorization_header: Option, client: reqwest::Client, @@ -198,7 +208,9 @@ impl UpcallClient { /// eventual cplane availability. The cplane API is idempotent. pub(crate) async fn notify_import_complete( &self, - import: &TimelineImport, + tenant_id: TenantId, + timeline_id: TimelineId, + import_result: ImportResult, ) -> anyhow::Result<()> { let endpoint = if self.base_url.ends_with('/') { format!("{}import_complete", self.base_url) @@ -206,15 +218,13 @@ impl UpcallClient { format!("{}/import_complete", self.base_url) }; - tracing::info!("Endpoint is {endpoint}"); - let request = self .client .request(Method::PUT, endpoint) .json(&ImportCompleteRequest { - tenant_id: import.tenant_id, - timeline_id: import.timeline_id, - error: import.completion_error(), + tenant_id, + timeline_id, + error: import_result.err(), }) .timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT); diff --git a/test_runner/regress/test_import_pgdata.py b/test_runner/regress/test_import_pgdata.py index 05e63ad955..0472b92145 100644 --- a/test_runner/regress/test_import_pgdata.py +++ b/test_runner/regress/test_import_pgdata.py @@ -130,9 +130,8 @@ def test_pgdata_import_smoke( elif rel_block_size == RelBlockSize.TWO_STRPES_PER_SHARD: target_relblock_size = (shard_count or 1) * stripe_size * 8192 * 2 elif rel_block_size == RelBlockSize.MULTIPLE_RELATION_SEGMENTS: - # Postgres uses a 1GiB segment size, fixed at compile time, so we must use >2GB of data - # to exercise multiple segments. - target_relblock_size = int(((2.333 * 1024 * 1024 * 1024) // 8192) * 8192) + segment_size = 16 * 1024 * 1024 + target_relblock_size = segment_size * 8 else: raise ValueError @@ -413,6 +412,88 @@ def test_import_completion_on_restart( wait_until(cplane_notified) +@run_only_on_default_postgres(reason="PG version is irrelevant here") +def test_import_respects_tenant_shutdown( + neon_env_builder: NeonEnvBuilder, vanilla_pg: VanillaPostgres, make_httpserver: HTTPServer +): + """ + Validate that importing timelines respect the usual timeline life cycle: + 1. Shut down on tenant shut-down and resumes upon re-attach + 2. Deletion on timeline deletion (TODO) + """ + # Set up mock control plane HTTP server to listen for import completions + import_completion_signaled = Event() + + def handler(request: Request) -> Response: + log.info(f"control plane /import_complete request: {request.json}") + import_completion_signaled.set() + return Response(json.dumps({}), status=200) + + cplane_mgmt_api_server = make_httpserver + cplane_mgmt_api_server.expect_request( + "/storage/api/v1/import_complete", method="PUT" + ).respond_with_handler(handler) + + # Plug the cplane mock in + neon_env_builder.control_plane_hooks_api = ( + f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/" + ) + + # The import will specifiy a local filesystem path mocking remote storage + neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) + + vanilla_pg.start() + vanilla_pg.stop() + + env = neon_env_builder.init_configs() + env.start() + + importbucket_path = neon_env_builder.repo_dir / "test_import_completion_bucket" + mock_import_bucket(vanilla_pg, importbucket_path) + + tenant_id = TenantId.generate() + timeline_id = TimelineId.generate() + idempotency = ImportPgdataIdemptencyKey.random() + + # Pause before sending the notification + failpoint_name = "import-timeline-pre-execute-pausable" + env.pageserver.http_client().configure_failpoints((failpoint_name, "pause")) + + env.storage_controller.tenant_create(tenant_id) + env.storage_controller.timeline_create( + tenant_id, + { + "new_timeline_id": str(timeline_id), + "import_pgdata": { + "idempotency_key": str(idempotency), + "location": {"LocalFs": {"path": str(importbucket_path.absolute())}}, + }, + }, + ) + + def hit_failpoint(): + log.info("Checking log for pattern...") + try: + assert env.pageserver.log_contains(f".*at failpoint {failpoint_name}.*") + except Exception: + log.exception("Failed to find pattern in log") + raise + + wait_until(hit_failpoint) + assert not import_completion_signaled.is_set() + + # Restart the pageserver while an import job is in progress. + # This clears the failpoint and we expect that the import starts up afresh + # after the restart and eventually completes. + env.pageserver.stop() + env.pageserver.start() + + def cplane_notified(): + assert import_completion_signaled.is_set() + + wait_until(cplane_notified) + + def test_fast_import_with_pageserver_ingest( test_output_dir, vanilla_pg: VanillaPostgres, @@ -520,7 +601,9 @@ def test_fast_import_with_pageserver_ingest( env.neon_cli.mappings_map_branch(import_branch_name, tenant_id, timeline_id) # Run fast_import - fast_import.set_aws_creds(mock_s3_server, {"RUST_LOG": "aws_config=debug,aws_sdk_kms=debug"}) + fast_import.set_aws_creds( + mock_s3_server, {"RUST_LOG": "info,aws_config=debug,aws_sdk_kms=debug"} + ) pg_port = port_distributor.get_port() fast_import.run_pgdata(pg_port=pg_port, s3prefix=f"s3://{bucket}/{key_prefix}")