diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index 4a87a91910..219e63c9d4 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::error::Error as _; +use std::time::Duration; use bytes::Bytes; use detach_ancestor::AncestorDetached; @@ -819,4 +820,25 @@ impl Client { .await .map(|resp| resp.status()) } + + pub async fn activate_post_import( + &self, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + activate_timeline_timeout: Duration, + ) -> Result { + let uri = format!( + "{}/v1/tenant/{}/timeline/{}/activate_post_import?timeline_activate_timeout_ms={}", + self.mgmt_api_endpoint, + tenant_shard_id, + timeline_id, + activate_timeline_timeout.as_millis() + ); + + self.request(Method::PUT, uri, ()) + .await? + .json() + .await + .map_err(Error::ReceiveBody) + } } diff --git a/pageserver/src/controller_upcall_client.rs b/pageserver/src/controller_upcall_client.rs index 468e5463b0..6d186b091a 100644 --- a/pageserver/src/controller_upcall_client.rs +++ b/pageserver/src/controller_upcall_client.rs @@ -53,6 +53,11 @@ pub trait StorageControllerUpcallApi { timeline_id: TimelineId, status: ShardImportStatus, ) -> impl Future> + Send; + fn get_timeline_import_status( + &self, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + ) -> impl Future, RetryForeverError>> + Send; } impl StorageControllerUpcallClient { @@ -302,4 +307,39 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient { self.retry_http_forever(&url, request).await } + + #[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context + async fn get_timeline_import_status( + &self, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + ) -> Result, RetryForeverError> { + let url = self + .base_url + .join(format!("timeline_import_status/{}/{}", tenant_shard_id, timeline_id).as_str()) + .expect("Failed to build path"); + + Ok(backoff::retry( + || async { + let response = self.http_client.get(url.clone()).send().await?; + + if let Err(err) = response.error_for_status_ref() { + if matches!(err.status(), Some(reqwest::StatusCode::NOT_FOUND)) { + return Ok(None); + } else { + return Err(err); + } + } + response.json::().await.map(Some) + }, + |_| false, + 3, + u32::MAX, + "storage controller upcall", + &self.cancel, + ) + .await + .ok_or(RetryForeverError::ShuttingDown)? + .expect("We retry forever, this should never be reached")) + } } diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 4d62bc4ab5..65b2de28cd 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -663,6 +663,7 @@ mod test { use camino::Utf8Path; use hex_literal::hex; use pageserver_api::key::Key; + use pageserver_api::models::ShardImportStatus; use pageserver_api::shard::ShardIndex; use pageserver_api::upcall_api::ReAttachResponseTenant; use remote_storage::{RemoteStorageConfig, RemoteStorageKind}; @@ -796,6 +797,14 @@ mod test { ) -> Result<(), RetryForeverError> { unimplemented!() } + + async fn get_timeline_import_status( + &self, + _tenant_shard_id: TenantShardId, + _timeline_id: TimelineId, + ) -> Result, RetryForeverError> { + unimplemented!() + } } async fn setup(test_name: &str) -> anyhow::Result { diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 8b6500b020..2edec9dda1 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -3500,6 +3500,107 @@ async fn put_tenant_timeline_import_wal( }.instrument(span).await } +/// Activate a timeline after its import has completed +/// +/// The endpoint is idempotent and callers are expected to retry all +/// errors until a successful response. +async fn activate_post_import_handler( + request: Request, + _cancel: CancellationToken, +) -> Result, ApiError> { + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; + + let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; + const DEFAULT_ACTIVATE_TIMEOUT: Duration = Duration::from_secs(1); + let activate_timeout = parse_query_param(&request, "timeline_activate_timeout_ms")? + .map(Duration::from_millis) + .unwrap_or(DEFAULT_ACTIVATE_TIMEOUT); + + let span = info_span!( + "activate_post_import_handler", + tenant_id=%tenant_shard_id.tenant_id, + timeline_id=%timeline_id, + shard_id=%tenant_shard_id.shard_slug() + ); + + async move { + let state = get_state(&request); + let tenant = state + .tenant_manager + .get_attached_tenant_shard(tenant_shard_id)?; + + tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?; + + tenant + .finalize_importing_timeline(timeline_id) + .await + .map_err(ApiError::InternalServerError)?; + + match tenant.get_timeline(timeline_id, false) { + Ok(_timeline) => { + // Timeline is already visible. Reset not required: fall through. + } + Err(GetTimelineError::NotFound { .. }) => { + // This is crude: we reset the whole tenant such that the new timeline is detected + // and activated. We can come up with something more granular in the future. + // + // Note that we only reset the tenant if required: when the timeline is + // not present in [`Tenant::timelines`]. + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); + state + .tenant_manager + .reset_tenant(tenant_shard_id, false, &ctx) + .await + .map_err(ApiError::InternalServerError)?; + } + Err(GetTimelineError::ShuttingDown) => { + return Err(ApiError::ShuttingDown); + } + Err(GetTimelineError::NotActive { .. }) => { + unreachable!("Called get_timeline with active_only=false"); + } + } + + let timeline = tenant.get_timeline(timeline_id, false)?; + + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn) + .with_scope_timeline(&timeline); + + let result = + tokio::time::timeout(activate_timeout, timeline.wait_to_become_active(&ctx)).await; + match result { + Ok(Ok(())) => { + // fallthrough + } + // Timeline reached some other state that's not active + // TODO(vlad): if the tenant is broken, return a permananet error + Ok(Err(_timeline_state)) => { + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "Timeline activation failed" + ))); + } + // Activation timed out + Err(_) => { + return Err(ApiError::Timeout("Timeline activation timed out".into())); + } + } + + let timeline_info = build_timeline_info( + &timeline, false, // include_non_incremental_logical_size, + false, // force_await_initial_logical_size + &ctx, + ) + .await + .context("get local timeline info") + .map_err(ApiError::InternalServerError)?; + + json_response(StatusCode::OK, timeline_info) + } + .instrument(span) + .await +} + /// Read the end of a tar archive. /// /// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each. @@ -3924,5 +4025,9 @@ pub fn make_router( "/v1/tenant/:tenant_id/timeline/:timeline_id/import_wal", |r| api_handler(r, put_tenant_timeline_import_wal), ) + .put( + "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/activate_post_import", + |r| api_handler(r, activate_post_import_handler), + ) .any(handler_404)) } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index e59db74479..441049f47d 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -50,6 +50,7 @@ use remote_timeline_client::{ use secondary::heatmap::{HeatMapTenant, HeatMapTimeline}; use storage_broker::BrokerClientChannel; use timeline::compaction::{CompactionOutcome, GcCompactionQueue}; +use timeline::import_pgdata::ImportingTimeline; use timeline::offload::{OffloadError, offload_timeline}; use timeline::{ CompactFlags, CompactOptions, CompactionError, PreviousHeatmap, ShutdownMode, import_pgdata, @@ -284,6 +285,19 @@ pub struct TenantShard { /// **Lock order**: if acquiring all (or a subset), acquire them in order `timelines`, `timelines_offloaded`, `timelines_creating` timelines_offloaded: Mutex>>, + /// Tracks the timelines that are currently importing into this tenant shard. + /// + /// Note that importing timelines are also present in [`Self::timelines_creating`]. + /// Keep this in mind when ordering lock acquisition. + /// + /// Lifetime: + /// * An imported timeline is created while scanning the bucket on tenant attach + /// if the index part contains an `import_pgdata` entry and said field marks the import + /// as in progress. + /// * Imported timelines are removed when the storage controller calls the post timeline + /// import activation endpoint. + timelines_importing: std::sync::Mutex>, + /// The last tenant manifest known to be in remote storage. None if the manifest has not yet /// been either downloaded or uploaded. Always Some after tenant attach. /// @@ -923,19 +937,10 @@ enum StartCreatingTimelineResult { #[allow(clippy::large_enum_variant, reason = "TODO")] enum TimelineInitAndSyncResult { - ReadyToActivate(Arc), + ReadyToActivate, NeedsSpawnImportPgdata(TimelineInitAndSyncNeedsSpawnImportPgdata), } -impl TimelineInitAndSyncResult { - fn ready_to_activate(self) -> Option> { - match self { - Self::ReadyToActivate(timeline) => Some(timeline), - _ => None, - } - } -} - #[must_use] struct TimelineInitAndSyncNeedsSpawnImportPgdata { timeline: Arc, @@ -1012,10 +1017,6 @@ enum CreateTimelineCause { enum LoadTimelineCause { Attach, Unoffload, - ImportPgdata { - create_guard: TimelineCreateGuard, - activate: ActivateTimelineArgs, - }, } #[derive(thiserror::Error, Debug)] @@ -1097,7 +1098,7 @@ impl TenantShard { self: &Arc, timeline_id: TimelineId, resources: TimelineResources, - mut index_part: IndexPart, + index_part: IndexPart, metadata: TimelineMetadata, previous_heatmap: Option, ancestor: Option>, @@ -1106,7 +1107,7 @@ impl TenantShard { ) -> anyhow::Result { let tenant_id = self.tenant_shard_id; - let import_pgdata = index_part.import_pgdata.take(); + let import_pgdata = index_part.import_pgdata.clone(); let idempotency = match &import_pgdata { Some(import_pgdata) => { CreateTimelineIdempotency::ImportPgdata(CreatingTimelineIdempotencyImportPgdata { @@ -1127,7 +1128,7 @@ impl TenantShard { } }; - let (timeline, timeline_ctx) = self.create_timeline_struct( + let (timeline, _timeline_ctx) = self.create_timeline_struct( timeline_id, &metadata, previous_heatmap, @@ -1197,14 +1198,6 @@ impl TenantShard { match import_pgdata { Some(import_pgdata) if !import_pgdata.is_done() => { - match cause { - LoadTimelineCause::Attach | LoadTimelineCause::Unoffload => (), - LoadTimelineCause::ImportPgdata { .. } => { - unreachable!( - "ImportPgdata should not be reloading timeline import is done and persisted as such in s3" - ) - } - } let mut guard = self.timelines_creating.lock().unwrap(); if !guard.insert(timeline_id) { // We should never try and load the same timeline twice during startup @@ -1260,26 +1253,7 @@ impl TenantShard { "Timeline has no ancestor and no layer files" ); - match cause { - LoadTimelineCause::Attach | LoadTimelineCause::Unoffload => (), - LoadTimelineCause::ImportPgdata { - create_guard, - activate, - } => { - // TODO: see the comment in the task code above how I'm not so certain - // it is safe to activate here because of concurrent shutdowns. - match activate { - ActivateTimelineArgs::Yes { broker_client } => { - info!("activating timeline after reload from pgdata import task"); - timeline.activate(self.clone(), broker_client, None, &timeline_ctx); - } - ActivateTimelineArgs::No => (), - } - drop(create_guard); - } - } - - Ok(TimelineInitAndSyncResult::ReadyToActivate(timeline)) + Ok(TimelineInitAndSyncResult::ReadyToActivate) } } } @@ -1768,7 +1742,7 @@ impl TenantShard { })?; match effect { - TimelineInitAndSyncResult::ReadyToActivate(_) => { + TimelineInitAndSyncResult::ReadyToActivate => { // activation happens later, on Tenant::activate } TimelineInitAndSyncResult::NeedsSpawnImportPgdata( @@ -1778,13 +1752,24 @@ impl TenantShard { guard, }, ) => { - tokio::task::spawn(self.clone().create_timeline_import_pgdata_task( - timeline, - import_pgdata, - ActivateTimelineArgs::No, - guard, - ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Warn), - )); + let timeline_id = timeline.timeline_id; + let import_task_handle = + tokio::task::spawn(self.clone().create_timeline_import_pgdata_task( + timeline.clone(), + import_pgdata, + guard, + ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Warn), + )); + + let prev = self.timelines_importing.lock().unwrap().insert( + timeline_id, + ImportingTimeline { + timeline: timeline.clone(), + import_task_handle, + }, + ); + + assert!(prev.is_none()); } } } @@ -2678,14 +2663,7 @@ impl TenantShard { .await? } CreateTimelineParams::ImportPgdata(params) => { - self.create_timeline_import_pgdata( - params, - ActivateTimelineArgs::Yes { - broker_client: broker_client.clone(), - }, - ctx, - ) - .await? + self.create_timeline_import_pgdata(params, ctx).await? } }; @@ -2759,7 +2737,6 @@ impl TenantShard { async fn create_timeline_import_pgdata( self: &Arc, params: CreateTimelineParamsImportPgdata, - activate: ActivateTimelineArgs, ctx: &RequestContext, ) -> Result { let CreateTimelineParamsImportPgdata { @@ -2840,24 +2817,71 @@ impl TenantShard { let (timeline, timeline_create_guard) = uninit_timeline.finish_creation_myself(); - tokio::spawn(self.clone().create_timeline_import_pgdata_task( + let import_task_handle = tokio::spawn(self.clone().create_timeline_import_pgdata_task( timeline.clone(), index_part, - activate, timeline_create_guard, timeline_ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Warn), )); + let prev = self.timelines_importing.lock().unwrap().insert( + timeline.timeline_id, + ImportingTimeline { + timeline: timeline.clone(), + import_task_handle, + }, + ); + + // Idempotency is enforced higher up the stack + assert!(prev.is_none()); + // NB: the timeline doesn't exist in self.timelines at this point Ok(CreateTimelineResult::ImportSpawned(timeline)) } + /// Finalize the import of a timeline on this shard by marking it complete in + /// the index part. If the import task hasn't finished yet, returns an error. + /// + /// This method is idempotent. If the import was finalized once, the next call + /// will be a no-op. + pub(crate) async fn finalize_importing_timeline( + &self, + timeline_id: TimelineId, + ) -> anyhow::Result<()> { + let timeline = { + let locked = self.timelines_importing.lock().unwrap(); + match locked.get(&timeline_id) { + Some(importing_timeline) => { + if !importing_timeline.import_task_handle.is_finished() { + return Err(anyhow::anyhow!("Import task not done yet")); + } + + importing_timeline.timeline.clone() + } + None => { + return Ok(()); + } + } + }; + + timeline + .remote_client + .schedule_index_upload_for_import_pgdata_finalize()?; + timeline.remote_client.wait_completion().await?; + + self.timelines_importing + .lock() + .unwrap() + .remove(&timeline_id); + + Ok(()) + } + #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id))] async fn create_timeline_import_pgdata_task( self: Arc, timeline: Arc, index_part: import_pgdata::index_part_format::Root, - activate: ActivateTimelineArgs, timeline_create_guard: TimelineCreateGuard, ctx: RequestContext, ) { @@ -2869,7 +2893,6 @@ impl TenantShard { .create_timeline_import_pgdata_task_impl( timeline, index_part, - activate, timeline_create_guard, ctx, ) @@ -2885,60 +2908,15 @@ impl TenantShard { self: Arc, timeline: Arc, index_part: import_pgdata::index_part_format::Root, - activate: ActivateTimelineArgs, - timeline_create_guard: TimelineCreateGuard, + _timeline_create_guard: TimelineCreateGuard, ctx: RequestContext, ) -> Result<(), anyhow::Error> { info!("importing pgdata"); + let ctx = ctx.with_scope_timeline(&timeline); import_pgdata::doit(&timeline, index_part, &ctx, self.cancel.clone()) .await .context("import")?; - info!("import done"); - - // - // Reload timeline from remote. - // This proves that the remote state is attachable, and it reuses the code. - // - // TODO: think about whether this is safe to do with concurrent TenantShard::shutdown. - // timeline_create_guard hols the tenant gate open, so, shutdown cannot _complete_ until we exit. - // But our activate() call might launch new background tasks after TenantShard::shutdown - // already went past shutting down the TenantShard::timelines, which this timeline here is no part of. - // I think the same problem exists with the bootstrap & branch mgmt API tasks (tenant shutting - // down while bootstrapping/branching + activating), but, the race condition is much more likely - // to manifest because of the long runtime of this import task. - - // in theory this shouldn't even .await anything except for coop yield - info!("shutting down timeline"); - timeline.shutdown(ShutdownMode::Hard).await; - info!("timeline shut down, reloading from remote"); - // TODO: we can't do the following check because create_timeline_import_pgdata must return an Arc - // let Some(timeline) = Arc::into_inner(timeline) else { - // anyhow::bail!("implementation error: timeline that we shut down was still referenced from somewhere"); - // }; - let timeline_id = timeline.timeline_id; - - // load from object storage like TenantShard::attach does - let resources = self.build_timeline_resources(timeline_id); - let index_part = resources - .remote_client - .download_index_file(&self.cancel) - .await?; - let index_part = match index_part { - MaybeDeletedIndexPart::Deleted(_) => { - // likely concurrent delete call, cplane should prevent this - anyhow::bail!( - "index part says deleted but we are not done creating yet, this should not happen but" - ) - } - MaybeDeletedIndexPart::IndexPart(p) => p, - }; - let metadata = index_part.metadata.clone(); - self - .load_remote_timeline(timeline_id, index_part, metadata, None, resources, LoadTimelineCause::ImportPgdata{ - create_guard: timeline_create_guard, activate, }, &ctx) - .await? - .ready_to_activate() - .context("implementation error: reloaded timeline still needs import after import reported success")?; + info!("import done - waiting for activation"); anyhow::Ok(()) } @@ -3475,6 +3453,14 @@ impl TenantShard { timeline.defuse_for_tenant_drop(); }); } + { + let mut timelines_importing = self.timelines_importing.lock().unwrap(); + timelines_importing + .drain() + .for_each(|(_timeline_id, importing_timeline)| { + importing_timeline.shutdown(); + }); + } // test_long_timeline_create_then_tenant_delete is leaning on this message tracing::info!("Waiting for timelines..."); while let Some(res) = js.join_next().await { @@ -3949,13 +3935,6 @@ where Ok(result) } -enum ActivateTimelineArgs { - Yes { - broker_client: storage_broker::BrokerClientChannel, - }, - No, -} - impl TenantShard { pub fn tenant_specific_overrides(&self) -> pageserver_api::models::TenantConfig { self.tenant_conf.load().tenant_conf.clone() @@ -4322,6 +4301,7 @@ impl TenantShard { timelines: Mutex::new(HashMap::new()), timelines_creating: Mutex::new(HashSet::new()), timelines_offloaded: Mutex::new(HashMap::new()), + timelines_importing: Mutex::new(HashMap::new()), remote_tenant_manifest: Default::default(), gc_cs: tokio::sync::Mutex::new(()), walredo_mgr, diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index ea29f51956..21d68495f7 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -949,6 +949,35 @@ impl RemoteTimelineClient { Ok(()) } + /// If the `import_pgdata` field marks the timeline as having an import in progress, + /// launch an index-file upload operation that transitions it to done in the background + pub(crate) fn schedule_index_upload_for_import_pgdata_finalize( + self: &Arc, + ) -> anyhow::Result<()> { + use import_pgdata::index_part_format; + + let mut guard = self.upload_queue.lock().unwrap(); + let upload_queue = guard.initialized_mut()?; + let to_update = match &upload_queue.dirty.import_pgdata { + Some(import) if !import.is_done() => Some(import), + Some(_) | None => None, + }; + + if let Some(old) = to_update { + let new = + index_part_format::Root::V1(index_part_format::V1::Done(index_part_format::Done { + idempotency_key: old.idempotency_key().clone(), + started_at: *old.started_at(), + finished_at: chrono::Utc::now().naive_utc(), + })); + + upload_queue.dirty.import_pgdata = Some(new); + self.schedule_index_upload(upload_queue); + } + + Ok(()) + } + /// Launch an index-file upload operation in the background, setting `gc_compaction_state` field. pub(crate) fn schedule_index_upload_for_gc_compaction_state_update( self: &Arc, diff --git a/pageserver/src/tenant/timeline/import_pgdata.rs b/pageserver/src/tenant/timeline/import_pgdata.rs index c4a8df39a3..53e15e5395 100644 --- a/pageserver/src/tenant/timeline/import_pgdata.rs +++ b/pageserver/src/tenant/timeline/import_pgdata.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use anyhow::{Context, bail}; use pageserver_api::models::ShardImportStatus; use remote_storage::RemotePath; +use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tracing::info; use utils::lsn::Lsn; @@ -17,6 +18,17 @@ mod importbucket_client; mod importbucket_format; pub(crate) mod index_part_format; +pub(crate) struct ImportingTimeline { + pub import_task_handle: JoinHandle<()>, + pub timeline: Arc, +} + +impl ImportingTimeline { + pub(crate) fn shutdown(self) { + self.import_task_handle.abort(); + } +} + pub async fn doit( timeline: &Arc, index_part: index_part_format::Root, @@ -26,173 +38,161 @@ pub async fn doit( let index_part_format::Root::V1(v1) = index_part; let index_part_format::InProgress { location, - idempotency_key, - started_at, + idempotency_key: _, + started_at: _, } = match v1 { index_part_format::V1::Done(_) => return Ok(()), index_part_format::V1::InProgress(in_progress) => in_progress, }; - let storage = importbucket_client::new(timeline.conf, &location, cancel.clone()).await?; + let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel); - let status_prefix = RemotePath::from_string("status").unwrap(); + let shard_status = storcon_client + .get_timeline_import_status(timeline.tenant_shard_id, timeline.timeline_id) + .await + .map_err(|_err| anyhow::anyhow!("Shut down while getting timeline import status"))?; - // - // See if shard is done. - // TODO: incorporate generations into status key for split brain safety. Figure out together with checkpointing. - // - let shard_status_key = - status_prefix.join(format!("shard-{}", timeline.tenant_shard_id.shard_slug())); - let shard_status: Option = - storage.get_json(&shard_status_key).await?; info!(?shard_status, "peeking shard status"); - if shard_status.map(|st| st.done).unwrap_or(false) { - info!("shard status indicates that the shard is done, skipping import"); - } else { - // TODO: checkpoint the progress into the IndexPart instead of restarting - // from the beginning. + match shard_status { + None | Some(ShardImportStatus::InProgress) => { + // TODO: checkpoint the progress into the IndexPart instead of restarting + // from the beginning. - // - // Wipe the slate clean - the flow does not allow resuming. - // We can implement resuming in the future by checkpointing the progress into the IndexPart. - // - info!("wipe the slate clean"); - { - // TODO: do we need to hold GC lock for this? - let mut guard = timeline.layers.write().await; - assert!( - guard.layer_map()?.open_layer.is_none(), - "while importing, there should be no in-memory layer" // this just seems like a good place to assert it - ); - let all_layers_keys = guard.all_persistent_layers(); - let all_layers: Vec<_> = all_layers_keys - .iter() - .map(|key| guard.get_from_key(key)) - .collect(); - let open = guard.open_mut().context("open_mut")?; + // + // Wipe the slate clean - the flow does not allow resuming. + // We can implement resuming in the future by checkpointing the progress into the IndexPart. + // + info!("wipe the slate clean"); + { + // TODO: do we need to hold GC lock for this? + let mut guard = timeline.layers.write().await; + assert!( + guard.layer_map()?.open_layer.is_none(), + "while importing, there should be no in-memory layer" // this just seems like a good place to assert it + ); + let all_layers_keys = guard.all_persistent_layers(); + let all_layers: Vec<_> = all_layers_keys + .iter() + .map(|key| guard.get_from_key(key)) + .collect(); + let open = guard.open_mut().context("open_mut")?; - timeline.remote_client.schedule_gc_update(&all_layers)?; - open.finish_gc_timeline(&all_layers); - } - - // - // Wait for pgdata to finish uploading - // - info!("wait for pgdata to reach status 'done'"); - let pgdata_status_key = status_prefix.join("pgdata"); - loop { - let res = async { - let pgdata_status: Option = storage - .get_json(&pgdata_status_key) - .await - .context("get pgdata status")?; - info!(?pgdata_status, "peeking pgdata status"); - if pgdata_status.map(|st| st.done).unwrap_or(false) { - Ok(()) - } else { - Err(anyhow::anyhow!("pgdata not done yet")) - } + timeline.remote_client.schedule_gc_update(&all_layers)?; + open.finish_gc_timeline(&all_layers); } - .await; - match res { - Ok(_) => break, - Err(err) => { - info!(?err, "indefinitely waiting for pgdata to finish"); - if tokio::time::timeout(std::time::Duration::from_secs(10), cancel.cancelled()) + + // + // Wait for pgdata to finish uploading + // + info!("wait for pgdata to reach status 'done'"); + let storage = + importbucket_client::new(timeline.conf, &location, cancel.clone()).await?; + let status_prefix = RemotePath::from_string("status").unwrap(); + let pgdata_status_key = status_prefix.join("pgdata"); + loop { + let res = async { + let pgdata_status: Option = storage + .get_json(&pgdata_status_key) + .await + .context("get pgdata status")?; + info!(?pgdata_status, "peeking pgdata status"); + if pgdata_status.map(|st| st.done).unwrap_or(false) { + Ok(()) + } else { + Err(anyhow::anyhow!("pgdata not done yet")) + } + } + .await; + match res { + Ok(_) => break, + Err(err) => { + info!(?err, "indefinitely waiting for pgdata to finish"); + if tokio::time::timeout( + std::time::Duration::from_secs(10), + cancel.cancelled(), + ) .await .is_ok() - { - bail!("cancelled while waiting for pgdata"); + { + bail!("cancelled while waiting for pgdata"); + } } } } - } - // - // Do the import - // - info!("do the import"); - let control_file = storage.get_control_file().await?; - let base_lsn = control_file.base_lsn(); + // + // Do the import + // + info!("do the import"); + let control_file = storage.get_control_file().await?; + let base_lsn = control_file.base_lsn(); - info!("update TimelineMetadata based on LSNs from control file"); - { - let pg_version = control_file.pg_version(); - let _ctx: &RequestContext = ctx; - async move { - // FIXME: The 'disk_consistent_lsn' should be the LSN at the *end* of the - // checkpoint record, and prev_record_lsn should point to its beginning. - // We should read the real end of the record from the WAL, but here we - // just fake it. - let disk_consistent_lsn = Lsn(base_lsn.0 + 8); - let prev_record_lsn = base_lsn; - let metadata = TimelineMetadata::new( - disk_consistent_lsn, - Some(prev_record_lsn), - None, // no ancestor - Lsn(0), // no ancestor lsn - base_lsn, // latest_gc_cutoff_lsn - base_lsn, // initdb_lsn - pg_version, - ); + info!("update TimelineMetadata based on LSNs from control file"); + { + let pg_version = control_file.pg_version(); + let _ctx: &RequestContext = ctx; + async move { + // FIXME: The 'disk_consistent_lsn' should be the LSN at the *end* of the + // checkpoint record, and prev_record_lsn should point to its beginning. + // We should read the real end of the record from the WAL, but here we + // just fake it. + let disk_consistent_lsn = Lsn(base_lsn.0 + 8); + let prev_record_lsn = base_lsn; + let metadata = TimelineMetadata::new( + disk_consistent_lsn, + Some(prev_record_lsn), + None, // no ancestor + Lsn(0), // no ancestor lsn + base_lsn, // latest_gc_cutoff_lsn + base_lsn, // initdb_lsn + pg_version, + ); - let _start_lsn = disk_consistent_lsn + 1; + let _start_lsn = disk_consistent_lsn + 1; - timeline - .remote_client - .schedule_index_upload_for_full_metadata_update(&metadata)?; + timeline + .remote_client + .schedule_index_upload_for_full_metadata_update(&metadata)?; - timeline.remote_client.wait_completion().await?; + timeline.remote_client.wait_completion().await?; - anyhow::Ok(()) + anyhow::Ok(()) + } } + .await?; + + flow::run(timeline.clone(), control_file, storage.clone(), ctx).await?; + + // Communicate that shard is done. + // Ensure at-least-once delivery of the upcall to storage controller + // before we mark the task as done and never come here again. + // + // Note that we do not mark the import complete in the index part now. + // This happens in [`Tenant::finalize_importing_timeline`] in response + // to the storage controller calling + // `/v1/tenant/:tenant_id/timeline/:timeline_id/activate_post_import`. + storcon_client + .put_timeline_import_status( + timeline.tenant_shard_id, + timeline.timeline_id, + // TODO(vlad): What about import errors? + ShardImportStatus::Done, + ) + .await + .map_err(|_err| { + anyhow::anyhow!("Shut down while putting timeline import status") + })?; + } + Some(ShardImportStatus::Error(err)) => { + info!( + "shard status indicates that the shard is done (error), skipping import {}", + err + ); + } + Some(ShardImportStatus::Done) => { + info!("shard status indicates that the shard is done (success), skipping import"); } - .await?; - - flow::run(timeline.clone(), control_file, storage.clone(), ctx).await?; - - // - // Communicate that shard is done. - // Ensure at-least-once delivery of the upcall to storage controller - // before we mark the task as done and never come here again. - // - let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel); - storcon_client - .put_timeline_import_status( - timeline.tenant_shard_id, - timeline.timeline_id, - // TODO(vlad): What about import errors? - ShardImportStatus::Done, - ) - .await - .map_err(|_err| anyhow::anyhow!("Shut down while putting timeline import status"))?; - - storage - .put_json( - &shard_status_key, - &importbucket_format::ShardStatus { done: true }, - ) - .await - .context("put shard status")?; } - // - // Mark as done in index_part. - // This makes subsequent timeline loads enter the normal load code path - // instead of spawning the import task and calling this here function. - // - info!("mark import as complete in index part"); - timeline - .remote_client - .schedule_index_upload_for_import_pgdata_state_update(Some(index_part_format::Root::V1( - index_part_format::V1::Done(index_part_format::Done { - idempotency_key, - started_at, - finished_at: chrono::Utc::now().naive_utc(), - }), - )))?; - - timeline.remote_client.wait_completion().await?; - Ok(()) } diff --git a/pageserver/src/tenant/timeline/import_pgdata/flow.rs b/pageserver/src/tenant/timeline/import_pgdata/flow.rs index 34c073365d..5b9c8ec5b5 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/flow.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/flow.rs @@ -53,6 +53,7 @@ use tokio_stream::StreamExt; use tracing::{debug, instrument}; use utils::bin_ser::BeSer; use utils::lsn::Lsn; +use utils::pausable_failpoint; use super::Timeline; use super::importbucket_client::{ControlFile, RemoteStorageWrapper}; @@ -79,6 +80,9 @@ pub async fn run( let import_config = &timeline.conf.timeline_import_config; let plan = planner.plan(import_config).await?; + + pausable_failpoint!("import-timeline-pre-execute-pausable"); + plan.execute(timeline, import_config, ctx).await } diff --git a/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs b/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs index e7aa8f6038..34313748b7 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/importbucket_client.rs @@ -190,31 +190,6 @@ impl RemoteStorageWrapper { Ok(Some(res)) } - #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))] - pub async fn put_json(&self, path: &RemotePath, value: &T) -> anyhow::Result<()> - where - T: serde::Serialize, - { - let buf = serde_json::to_vec(value)?; - let bytes = Bytes::from(buf); - utils::backoff::retry( - || async { - let size = bytes.len(); - let bytes = futures::stream::once(futures::future::ready(Ok(bytes.clone()))); - self.storage - .upload_storage_object(bytes, size, path, &self.cancel) - .await - }, - remote_storage::TimeoutOrCancel::caused_by_cancel, - 1, - u32::MAX, - &format!("put json {path}"), - &self.cancel, - ) - .await - .expect("practically infinite retries") - } - #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))] pub async fn get_range( &self, diff --git a/pageserver/src/tenant/timeline/import_pgdata/importbucket_format.rs b/pageserver/src/tenant/timeline/import_pgdata/importbucket_format.rs index 57c647cc7f..d9f4da4748 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/importbucket_format.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/importbucket_format.rs @@ -5,9 +5,3 @@ pub struct PgdataStatus { pub done: bool, // TODO: remaining fields } - -#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)] -pub struct ShardStatus { - pub done: bool, - // TODO: remaining fields -} diff --git a/pageserver/src/tenant/timeline/import_pgdata/index_part_format.rs b/pageserver/src/tenant/timeline/import_pgdata/index_part_format.rs index ea7a41b25f..371fc857dc 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/index_part_format.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/index_part_format.rs @@ -64,4 +64,12 @@ impl Root { }, } } + pub fn started_at(&self) -> &chrono::NaiveDateTime { + match self { + Root::V1(v1) => match v1 { + V1::InProgress(in_progress) => &in_progress.started_at, + V1::Done(done) => &done.started_at, + }, + } + } } diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index 649113b8ce..8d459cab9c 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -157,6 +157,29 @@ async fn handle_validate(req: Request) -> Result, ApiError> json_response(StatusCode::OK, state.service.validate(validate_req).await?) } +async fn handle_get_timeline_import_status(req: Request) -> Result, ApiError> { + check_permissions(&req, Scope::GenerationsApi)?; + + let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?; + let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?; + + let req = match maybe_forward(req).await { + ForwardOutcome::Forwarded(res) => { + return res; + } + ForwardOutcome::NotForwarded(req) => req, + }; + + let state = get_state(&req); + json_response( + StatusCode::OK, + state + .service + .handle_timeline_shard_import_progress(tenant_shard_id, timeline_id) + .await?, + ) +} + async fn handle_put_timeline_import_status(req: Request) -> Result, ApiError> { check_permissions(&req, Scope::GenerationsApi)?; @@ -2008,6 +2031,13 @@ pub fn make_router( .post("/upcall/v1/validate", |r| { named_request_span(r, handle_validate, RequestName("upcall_v1_validate")) }) + .get("/upcall/v1/timeline_import_status", |r| { + named_request_span( + r, + handle_get_timeline_import_status, + RequestName("upcall_v1_timeline_import_status"), + ) + }) .post("/upcall/v1/timeline_import_status", |r| { named_request_span( r, diff --git a/storage_controller/src/pageserver_client.rs b/storage_controller/src/pageserver_client.rs index 554ca375f5..817409e112 100644 --- a/storage_controller/src/pageserver_client.rs +++ b/storage_controller/src/pageserver_client.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use pageserver_api::models::detach_ancestor::AncestorDetached; use pageserver_api::models::{ DetachBehavior, LocationConfig, LocationConfigListResponse, LsnLease, PageserverUtilization, @@ -212,6 +214,7 @@ impl PageserverClient { ) } + #[allow(unused)] pub(crate) async fn timeline_detail( &self, tenant_shard_id: TenantShardId, @@ -357,4 +360,20 @@ impl PageserverClient { self.inner.wait_lsn(tenant_shard_id, request).await ) } + + pub(crate) async fn activate_post_import( + &self, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + timeline_activate_timeout: Duration, + ) -> Result { + measured_request!( + "activate_post_import", + crate::metrics::Method::Put, + &self.node_id_label, + self.inner + .activate_post_import(tenant_shard_id, timeline_id, timeline_activate_timeout) + .await + ) + } } diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 9ffcf9b9e6..052c0f02eb 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -1666,6 +1666,39 @@ impl Persistence { } } + pub(crate) async fn get_timeline_import( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> DatabaseResult> { + use crate::schema::timeline_imports::dsl; + let persistent_import = self + .with_measured_conn(DatabaseOperation::ListTimelineImports, move |conn| { + Box::pin(async move { + let mut from_db: Vec = dsl::timeline_imports + .filter(dsl::tenant_id.eq(tenant_id.to_string())) + .filter(dsl::timeline_id.eq(timeline_id.to_string())) + .load(conn) + .await?; + + if from_db.len() > 1 { + return Err(DatabaseError::Logical(format!( + "unexpected number of rows ({})", + from_db.len() + ))); + } + + Ok(from_db.pop()) + }) + }) + .await?; + + persistent_import + .map(TimelineImport::from_persistent) + .transpose() + .map_err(|err| DatabaseError::Logical(format!("failed to deserialize import: {err}"))) + } + pub(crate) async fn delete_timeline_import( &self, tenant_id: TenantId, diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 193050460d..05430733c2 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -35,12 +35,12 @@ use pageserver_api::controller_api::{ }; use pageserver_api::models::{ self, DetachBehavior, LocationConfig, LocationConfigListResponse, LocationConfigMode, LsnLease, - PageserverUtilization, SecondaryProgress, ShardParameters, TenantConfig, + PageserverUtilization, SecondaryProgress, ShardImportStatus, ShardParameters, TenantConfig, TenantConfigPatchRequest, TenantConfigRequest, TenantLocationConfigRequest, TenantLocationConfigResponse, TenantShardLocation, TenantShardSplitRequest, TenantShardSplitResponse, TenantSorting, TenantTimeTravelRequest, TimelineArchivalConfigRequest, TimelineCreateRequest, TimelineCreateResponseStorcon, - TimelineInfo, TimelineState, TopTenantShardItem, TopTenantShardsRequest, + TimelineInfo, TopTenantShardItem, TopTenantShardsRequest, }; use pageserver_api::shard::{ DEFAULT_STRIPE_SIZE, ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId, @@ -61,6 +61,7 @@ use utils::completion::Barrier; use utils::generation::Generation; use utils::id::{NodeId, TenantId, TimelineId}; use utils::lsn::Lsn; +use utils::shard::ShardIndex; use utils::sync::gate::{Gate, GateGuard}; use utils::{failpoint_support, pausable_failpoint}; @@ -98,7 +99,8 @@ use crate::tenant_shard::{ ScheduleOptimization, ScheduleOptimizationAction, TenantShard, }; use crate::timeline_import::{ - ShardImportStatuses, TimelineImport, TimelineImportState, UpcallClient, + ImportResult, ShardImportStatuses, TimelineImport, TimelineImportFinalizeError, + TimelineImportState, UpcallClient, }; const WAITER_FILL_DRAIN_POLL_TIMEOUT: Duration = Duration::from_millis(500); @@ -3905,6 +3907,38 @@ impl Service { }) } + pub(crate) async fn handle_timeline_shard_import_progress( + self: &Arc, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + ) -> Result { + let maybe_import = self + .persistence + .get_timeline_import(tenant_shard_id.tenant_id, timeline_id) + .await?; + + let import = maybe_import.ok_or_else(|| { + ApiError::NotFound( + format!( + "import for {}/{} not found", + tenant_shard_id.tenant_id, timeline_id + ) + .into(), + ) + })?; + + import + .shard_statuses + .0 + .get(&tenant_shard_id.to_index()) + .cloned() + .ok_or_else(|| { + ApiError::NotFound( + format!("shard {} not found", tenant_shard_id.shard_slug()).into(), + ) + }) + } + pub(crate) async fn handle_timeline_shard_import_progress_upcall( self: &Arc, req: PutTimelineImportStatusRequest, @@ -3943,6 +3977,16 @@ impl Service { Ok(()) } + /// Finalize the import of a timeline + /// + /// This method should be called once all shards have reported that the import is complete. + /// Firstly, it polls the post import timeline activation endpoint exposed by the pageserver. + /// Once the timeline is active on all shards, the timeline also gets created on the + /// safekeepers. Finally, notify cplane of the import completion (whether failed or + /// successful), and remove the import from the database and in-memory. + /// + /// If this method gets pre-empted by shut down, it will be called again at start-up (on-going + /// imports are stored in the database). #[instrument(skip_all, fields( tenant_id=%import.tenant_id, shard_id=%import.timeline_id, @@ -3950,59 +3994,80 @@ impl Service { async fn finalize_timeline_import( self: &Arc, import: TimelineImport, - ) -> anyhow::Result<()> { + ) -> Result<(), TimelineImportFinalizeError> { tracing::info!("Finalizing timeline import"); pausable_failpoint!("timeline-import-pre-cplane-notification"); - let import_failed = import.completion_error().is_some(); + let tenant_id = import.tenant_id; + let timeline_id = import.timeline_id; - if !import_failed { - loop { - if self.cancel.is_cancelled() { - anyhow::bail!("Shut down requested while finalizing import"); - } - - let active = self.timeline_active_on_all_shards(&import).await?; - - match active { - Some(timeline_info) => { - tracing::info!("Timeline became active on all shards"); - - if self.config.timelines_onto_safekeepers { - // Now that we know the start LSN of this timeline, create it on the - // safekeepers. - self.tenant_timeline_create_safekeepers_until_success( - import.tenant_id, - timeline_info, - ) - .await?; - } - - break; - } - None => { - tracing::info!("Timeline not active on all shards yet"); - - tokio::select! { - _ = self.cancel.cancelled() => { - anyhow::bail!("Shut down requested while finalizing import"); - }, - _ = tokio::time::sleep(Duration::from_secs(5)) => {} - }; - } - } + let import_error = import.completion_error(); + match import_error { + Some(err) => { + self.notify_cplane_and_delete_import(tenant_id, timeline_id, Err(err)) + .await?; + tracing::warn!("Timeline import completed with shard errors"); + Ok(()) } - } + None => match self.activate_timeline_post_import(&import).await { + Ok(timeline_info) => { + tracing::info!("Post import timeline activation complete"); + if self.config.timelines_onto_safekeepers { + // Now that we know the start LSN of this timeline, create it on the + // safekeepers. + self.tenant_timeline_create_safekeepers_until_success( + import.tenant_id, + timeline_info, + ) + .await?; + } + + self.notify_cplane_and_delete_import(tenant_id, timeline_id, Ok(())) + .await?; + + tracing::info!("Timeline import completed successfully"); + Ok(()) + } + Err(TimelineImportFinalizeError::ShuttingDown) => { + // We got pre-empted by shut down and will resume after the restart. + Err(TimelineImportFinalizeError::ShuttingDown) + } + Err(err) => { + // Any finalize error apart from shut down is permanent and requires us to notify + // cplane such that it can clean up. + tracing::error!("Import finalize failed with permanent error: {err}"); + self.notify_cplane_and_delete_import( + tenant_id, + timeline_id, + Err(err.to_string()), + ) + .await?; + Err(err) + } + }, + } + } + + async fn notify_cplane_and_delete_import( + self: &Arc, + tenant_id: TenantId, + timeline_id: TimelineId, + import_result: ImportResult, + ) -> Result<(), TimelineImportFinalizeError> { + let import_failed = import_result.is_err(); tracing::info!(%import_failed, "Notifying cplane of import completion"); let client = UpcallClient::new(self.get_config(), self.cancel.child_token()); - client.notify_import_complete(&import).await?; + client + .notify_import_complete(tenant_id, timeline_id, import_result) + .await + .map_err(|_err| TimelineImportFinalizeError::ShuttingDown)?; if let Err(err) = self .persistence - .delete_timeline_import(import.tenant_id, import.timeline_id) + .delete_timeline_import(tenant_id, timeline_id) .await { tracing::warn!("Failed to delete timeline import entry from database: {err}"); @@ -4012,14 +4077,113 @@ impl Service { .write() .unwrap() .tenants - .range_mut(TenantShardId::tenant_range(import.tenant_id)) + .range_mut(TenantShardId::tenant_range(tenant_id)) .for_each(|(_id, shard)| shard.importing = TimelineImportState::Idle); - tracing::info!(%import_failed, "Timeline import complete"); - Ok(()) } + /// Activate an imported timeline on all shards once the import is complete. + /// Returns the [`TimelineInfo`] reported by shard zero. + async fn activate_timeline_post_import( + self: &Arc, + import: &TimelineImport, + ) -> Result { + const TIMELINE_ACTIVATE_TIMEOUT: Duration = Duration::from_millis(128); + + let mut shards_to_activate: HashSet = + import.shard_statuses.0.keys().cloned().collect(); + let mut shard_zero_timeline_info = None; + + while !shards_to_activate.is_empty() { + if self.cancel.is_cancelled() { + return Err(TimelineImportFinalizeError::ShuttingDown); + } + + let targets = { + let locked = self.inner.read().unwrap(); + let mut targets = Vec::new(); + + for (tenant_shard_id, shard) in locked + .tenants + .range(TenantShardId::tenant_range(import.tenant_id)) + { + if !import + .shard_statuses + .0 + .contains_key(&tenant_shard_id.to_index()) + { + return Err(TimelineImportFinalizeError::MismatchedShards( + tenant_shard_id.to_index(), + )); + } + + if let Some(node_id) = shard.intent.get_attached() { + let node = locked + .nodes + .get(node_id) + .expect("Pageservers may not be deleted while referenced"); + targets.push((*tenant_shard_id, node.clone())); + } + } + + targets + }; + + let targeted_tenant_shards: Vec<_> = targets.iter().map(|(tid, _node)| *tid).collect(); + + let results = self + .tenant_for_shards_api( + targets, + |tenant_shard_id, client| async move { + client + .activate_post_import( + tenant_shard_id, + import.timeline_id, + TIMELINE_ACTIVATE_TIMEOUT, + ) + .await + }, + 1, + 1, + SHORT_RECONCILE_TIMEOUT, + &self.cancel, + ) + .await; + + let mut failed = 0; + for (tid, result) in targeted_tenant_shards.iter().zip(results.into_iter()) { + match result { + Ok(ok) => { + if tid.is_shard_zero() { + shard_zero_timeline_info = Some(ok); + } + + shards_to_activate.remove(&tid.to_index()); + } + Err(_err) => { + failed += 1; + } + } + } + + if failed > 0 { + tracing::info!( + "Failed to activate timeline on {failed} shards post import. Will retry" + ); + } + + tokio::select! { + _ = tokio::time::sleep(Duration::from_millis(250)) => {}, + _ = self.cancel.cancelled() => { + return Err(TimelineImportFinalizeError::ShuttingDown); + } + } + } + + Ok(shard_zero_timeline_info.expect("All shards replied")) + } + async fn finalize_timeline_imports(self: &Arc, imports: Vec) { futures::future::join_all( imports @@ -4029,78 +4193,6 @@ impl Service { .await; } - /// If the timeline is active on all shards, returns the [`TimelineInfo`] - /// collected from shard 0. - /// - /// An error is returned if the shard layout has changed during the import. - /// This is guarded against within the storage controller and the pageserver, - /// and, therefore, unexpected. - async fn timeline_active_on_all_shards( - self: &Arc, - import: &TimelineImport, - ) -> anyhow::Result> { - let targets = { - let locked = self.inner.read().unwrap(); - let mut targets = Vec::new(); - - for (tenant_shard_id, shard) in locked - .tenants - .range(TenantShardId::tenant_range(import.tenant_id)) - { - if !import - .shard_statuses - .0 - .contains_key(&tenant_shard_id.to_index()) - { - anyhow::bail!("Shard layout change detected on completion"); - } - - if let Some(node_id) = shard.intent.get_attached() { - let node = locked - .nodes - .get(node_id) - .expect("Pageservers may not be deleted while referenced"); - targets.push((*tenant_shard_id, node.clone())); - } else { - return Ok(None); - } - } - - targets - }; - - if targets.is_empty() { - anyhow::bail!("No shards found to finalize import for"); - } - - let results = self - .tenant_for_shards_api( - targets, - |tenant_shard_id, client| async move { - client - .timeline_detail(tenant_shard_id, import.timeline_id) - .await - }, - 1, - 1, - SHORT_RECONCILE_TIMEOUT, - &self.cancel, - ) - .await; - - let all_active = results.iter().all(|res| match res { - Ok(info) => info.state == TimelineState::Active, - Err(_) => false, - }); - - if all_active { - // Both unwraps are validated above - Ok(Some(results.into_iter().next().unwrap().unwrap())) - } else { - Ok(None) - } - } - pub(crate) async fn tenant_timeline_archival_config( &self, tenant_id: TenantId, diff --git a/storage_controller/src/service/safekeeper_service.rs b/storage_controller/src/service/safekeeper_service.rs index 5c15660ba3..cd5ace449d 100644 --- a/storage_controller/src/service/safekeeper_service.rs +++ b/storage_controller/src/service/safekeeper_service.rs @@ -10,6 +10,7 @@ use crate::persistence::{ DatabaseError, SafekeeperTimelineOpKind, TimelinePendingOpPersistence, TimelinePersistence, }; use crate::safekeeper::Safekeeper; +use crate::timeline_import::TimelineImportFinalizeError; use anyhow::Context; use http_utils::error::ApiError; use pageserver_api::controller_api::{ @@ -327,12 +328,12 @@ impl Service { self: &Arc, tenant_id: TenantId, timeline_info: TimelineInfo, - ) -> anyhow::Result<()> { + ) -> Result<(), TimelineImportFinalizeError> { const BACKOFF: Duration = Duration::from_secs(5); loop { if self.cancel.is_cancelled() { - anyhow::bail!("Shut down requested while finalizing import"); + return Err(TimelineImportFinalizeError::ShuttingDown); } let res = self @@ -348,7 +349,7 @@ impl Service { tracing::error!("Failed to create timeline on safekeepers: {err}"); tokio::select! { _ = self.cancel.cancelled() => { - anyhow::bail!("Shut down requested while finalizing import"); + return Err(TimelineImportFinalizeError::ShuttingDown); }, _ = tokio::time::sleep(BACKOFF) => {} }; diff --git a/storage_controller/src/timeline_import.rs b/storage_controller/src/timeline_import.rs index 6dcc538c4b..5d9d633932 100644 --- a/storage_controller/src/timeline_import.rs +++ b/storage_controller/src/timeline_import.rs @@ -46,6 +46,14 @@ pub(crate) enum TimelineImportUpdateFollowUp { None, } +#[derive(thiserror::Error, Debug)] +pub(crate) enum TimelineImportFinalizeError { + #[error("Shut down interrupted import finalize")] + ShuttingDown, + #[error("Mismatched shard detected during import finalize: {0}")] + MismatchedShards(ShardIndex), +} + pub(crate) enum TimelineImportUpdateError { ImportNotFound { tenant_id: TenantId, @@ -151,6 +159,8 @@ impl TimelineImport { } } +pub(crate) type ImportResult = Result<(), String>; + pub(crate) struct UpcallClient { authorization_header: Option, client: reqwest::Client, @@ -198,7 +208,9 @@ impl UpcallClient { /// eventual cplane availability. The cplane API is idempotent. pub(crate) async fn notify_import_complete( &self, - import: &TimelineImport, + tenant_id: TenantId, + timeline_id: TimelineId, + import_result: ImportResult, ) -> anyhow::Result<()> { let endpoint = if self.base_url.ends_with('/') { format!("{}import_complete", self.base_url) @@ -206,15 +218,13 @@ impl UpcallClient { format!("{}/import_complete", self.base_url) }; - tracing::info!("Endpoint is {endpoint}"); - let request = self .client .request(Method::PUT, endpoint) .json(&ImportCompleteRequest { - tenant_id: import.tenant_id, - timeline_id: import.timeline_id, - error: import.completion_error(), + tenant_id, + timeline_id, + error: import_result.err(), }) .timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT); diff --git a/test_runner/regress/test_import_pgdata.py b/test_runner/regress/test_import_pgdata.py index 05e63ad955..0472b92145 100644 --- a/test_runner/regress/test_import_pgdata.py +++ b/test_runner/regress/test_import_pgdata.py @@ -130,9 +130,8 @@ def test_pgdata_import_smoke( elif rel_block_size == RelBlockSize.TWO_STRPES_PER_SHARD: target_relblock_size = (shard_count or 1) * stripe_size * 8192 * 2 elif rel_block_size == RelBlockSize.MULTIPLE_RELATION_SEGMENTS: - # Postgres uses a 1GiB segment size, fixed at compile time, so we must use >2GB of data - # to exercise multiple segments. - target_relblock_size = int(((2.333 * 1024 * 1024 * 1024) // 8192) * 8192) + segment_size = 16 * 1024 * 1024 + target_relblock_size = segment_size * 8 else: raise ValueError @@ -413,6 +412,88 @@ def test_import_completion_on_restart( wait_until(cplane_notified) +@run_only_on_default_postgres(reason="PG version is irrelevant here") +def test_import_respects_tenant_shutdown( + neon_env_builder: NeonEnvBuilder, vanilla_pg: VanillaPostgres, make_httpserver: HTTPServer +): + """ + Validate that importing timelines respect the usual timeline life cycle: + 1. Shut down on tenant shut-down and resumes upon re-attach + 2. Deletion on timeline deletion (TODO) + """ + # Set up mock control plane HTTP server to listen for import completions + import_completion_signaled = Event() + + def handler(request: Request) -> Response: + log.info(f"control plane /import_complete request: {request.json}") + import_completion_signaled.set() + return Response(json.dumps({}), status=200) + + cplane_mgmt_api_server = make_httpserver + cplane_mgmt_api_server.expect_request( + "/storage/api/v1/import_complete", method="PUT" + ).respond_with_handler(handler) + + # Plug the cplane mock in + neon_env_builder.control_plane_hooks_api = ( + f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/" + ) + + # The import will specifiy a local filesystem path mocking remote storage + neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) + + vanilla_pg.start() + vanilla_pg.stop() + + env = neon_env_builder.init_configs() + env.start() + + importbucket_path = neon_env_builder.repo_dir / "test_import_completion_bucket" + mock_import_bucket(vanilla_pg, importbucket_path) + + tenant_id = TenantId.generate() + timeline_id = TimelineId.generate() + idempotency = ImportPgdataIdemptencyKey.random() + + # Pause before sending the notification + failpoint_name = "import-timeline-pre-execute-pausable" + env.pageserver.http_client().configure_failpoints((failpoint_name, "pause")) + + env.storage_controller.tenant_create(tenant_id) + env.storage_controller.timeline_create( + tenant_id, + { + "new_timeline_id": str(timeline_id), + "import_pgdata": { + "idempotency_key": str(idempotency), + "location": {"LocalFs": {"path": str(importbucket_path.absolute())}}, + }, + }, + ) + + def hit_failpoint(): + log.info("Checking log for pattern...") + try: + assert env.pageserver.log_contains(f".*at failpoint {failpoint_name}.*") + except Exception: + log.exception("Failed to find pattern in log") + raise + + wait_until(hit_failpoint) + assert not import_completion_signaled.is_set() + + # Restart the pageserver while an import job is in progress. + # This clears the failpoint and we expect that the import starts up afresh + # after the restart and eventually completes. + env.pageserver.stop() + env.pageserver.start() + + def cplane_notified(): + assert import_completion_signaled.is_set() + + wait_until(cplane_notified) + + def test_fast_import_with_pageserver_ingest( test_output_dir, vanilla_pg: VanillaPostgres, @@ -520,7 +601,9 @@ def test_fast_import_with_pageserver_ingest( env.neon_cli.mappings_map_branch(import_branch_name, tenant_id, timeline_id) # Run fast_import - fast_import.set_aws_creds(mock_s3_server, {"RUST_LOG": "aws_config=debug,aws_sdk_kms=debug"}) + fast_import.set_aws_creds( + mock_s3_server, {"RUST_LOG": "info,aws_config=debug,aws_sdk_kms=debug"} + ) pg_port = port_distributor.get_port() fast_import.run_pgdata(pg_port=pg_port, s3prefix=f"s3://{bucket}/{key_prefix}")