diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 5d1ac5ee15..4523fc2032 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -126,6 +126,7 @@ impl std::fmt::Debug for TenantState { /// A state of a timeline in pageserver's memory. #[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub enum TimelineState { + Creating, /// The timeline is recognized by the pageserver but is not yet operational. /// In particular, the walreceiver connection loop is not running for this timeline. /// It will eventually transition to state Active or Broken. diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 9e9285a009..5f795ea10e 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -51,6 +51,7 @@ use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME}; use crate::task_mgr; use crate::task_mgr::TaskKind; use crate::tenant; +use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::mgr; use crate::tenant::mgr::GetTenantError; use crate::tenant::{Tenant, Timeline}; @@ -489,7 +490,10 @@ impl PageServerHandler { // Create empty timeline info!("creating new timeline"); let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?; - let timeline = tenant.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)?; + + let (uninit_mark, timeline) = tenant + .create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx) + .await?; // TODO mark timeline as not ready until it reaches end_lsn. // We might have some wal to import as well, and we should prevent compute @@ -503,21 +507,32 @@ impl PageServerHandler { // Import basebackup provided via CopyData info!("importing basebackup"); - pgb.write_message_noflush(&BeMessage::CopyInResponse)?; - pgb.flush().await?; + let doit = async { + pgb.write_message_noflush(&BeMessage::CopyInResponse)?; + pgb.flush().await?; - let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb))); - timeline - .import_basebackup_from_tar( - &mut copyin_reader, - base_lsn, - self.broker_client.clone(), - &ctx, - ) - .await?; + let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb))); + timeline + .import_basebackup_from_tar(&mut copyin_reader, base_lsn, &ctx) + .await?; - // Read the end of the tar archive. - read_tar_eof(copyin_reader).await?; + // Read the end of the tar archive. + read_tar_eof(copyin_reader).await?; + anyhow::Ok(()) + }; + match doit.await { + Ok(()) => { + // TODO if we fail anywhere above, then we won't clean up the remote index part which create_empty_timeline already uploaded. + uninit_mark + .remove_uninit_mark() + .context("remove uninit mark")?; + } + Err(e) => { + debug_assert_current_span_has_tenant_and_timeline_id(); + error!("error importing basebackup: {:?}", e); + crate::tenant::cleanup_timeline_directory(uninit_mark); + } + } // TODO check checksum // Meanwhile you can verify client-side by taking fullbackup @@ -525,7 +540,9 @@ impl PageServerHandler { // It wouldn't work if base came from vanilla postgres though, // since we discard some log files. - info!("done"); + info!("done, activating timeline"); + timeline.activate(self.broker_client.clone(), &ctx); + Ok(()) } diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 82aebc6c07..7dcf1d16a8 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -270,6 +270,8 @@ pub enum TaskKind { DebugTool, + CreateTimeline, + #[cfg(test)] UnitTest, } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index ebfbc91fac..6b020dd28e 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -28,6 +28,7 @@ use std::collections::BTreeSet; use std::collections::HashMap; use std::ffi::OsStr; use std::fs; +use std::fs::DirEntry; use std::fs::File; use std::fs::OpenOptions; use std::io; @@ -40,7 +41,6 @@ use std::process::Stdio; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::Arc; -use std::sync::MutexGuard; use std::sync::{Mutex, RwLock}; use std::time::{Duration, Instant}; @@ -157,144 +157,18 @@ pub struct Tenant { eviction_task_tenant_state: tokio::sync::Mutex, } -/// A timeline with some of its files on disk, being initialized. -/// This struct ensures the atomicity of the timeline init: it's either properly created and inserted into pageserver's memory, or -/// its local files are removed. In the worst case of a crash, an uninit mark file is left behind, which causes the directory -/// to be removed on next restart. -/// -/// The caller is responsible for proper timeline data filling before the final init. -#[must_use] -pub struct UninitializedTimeline<'t> { - owning_tenant: &'t Tenant, - timeline_id: TimelineId, - raw_timeline: Option<(Arc, TimelineUninitMark)>, -} - /// An uninit mark file, created along the timeline dir to ensure the timeline either gets fully initialized and loaded into pageserver's memory, /// or gets removed eventually. /// /// XXX: it's important to create it near the timeline dir, not inside it to ensure timeline dir gets removed first. #[must_use] -struct TimelineUninitMark { +pub(crate) struct TimelineUninitMark { uninit_mark_deleted: bool, uninit_mark_path: PathBuf, timeline_path: PathBuf, } -impl UninitializedTimeline<'_> { - /// Ensures timeline data is valid, loads it into pageserver's memory and removes - /// uninit mark file on success. - /// - /// This function launches the flush loop if not already done. - /// - /// The caller is responsible for activating the timeline (function `.activate()`). - fn initialize_with_lock( - mut self, - _ctx: &RequestContext, - timelines: &mut HashMap>, - load_layer_map: bool, - ) -> anyhow::Result> { - let timeline_id = self.timeline_id; - let tenant_id = self.owning_tenant.tenant_id; - - let (new_timeline, uninit_mark) = self.raw_timeline.take().with_context(|| { - format!("No timeline for initalization found for {tenant_id}/{timeline_id}") - })?; - - let new_disk_consistent_lsn = new_timeline.get_disk_consistent_lsn(); - // TODO it would be good to ensure that, but apparently a lot of our testing is dependend on that at least - // ensure!(new_disk_consistent_lsn.is_valid(), - // "Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn and cannot be initialized"); - - match timelines.entry(timeline_id) { - Entry::Occupied(_) => anyhow::bail!( - "Found freshly initialized timeline {tenant_id}/{timeline_id} in the tenant map" - ), - Entry::Vacant(v) => { - if load_layer_map { - new_timeline - .load_layer_map(new_disk_consistent_lsn) - .with_context(|| { - format!( - "Failed to load layermap for timeline {tenant_id}/{timeline_id}" - ) - })?; - } - uninit_mark.remove_uninit_mark().with_context(|| { - format!( - "Failed to remove uninit mark file for timeline {tenant_id}/{timeline_id}" - ) - })?; - v.insert(Arc::clone(&new_timeline)); - - new_timeline.maybe_spawn_flush_loop(); - } - } - - Ok(new_timeline) - } - - /// Prepares timeline data by loading it from the basebackup archive. - pub async fn import_basebackup_from_tar( - self, - copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin), - base_lsn: Lsn, - broker_client: storage_broker::BrokerClientChannel, - ctx: &RequestContext, - ) -> anyhow::Result> { - let raw_timeline = self.raw_timeline()?; - - import_datadir::import_basebackup_from_tar(raw_timeline, copyin_read, base_lsn, ctx) - .await - .context("Failed to import basebackup")?; - - // Flush loop needs to be spawned in order to be able to flush. - // We want to run proper checkpoint before we mark timeline as available to outside world - // Thus spawning flush loop manually and skipping flush_loop setup in initialize_with_lock - raw_timeline.maybe_spawn_flush_loop(); - - fail::fail_point!("before-checkpoint-new-timeline", |_| { - bail!("failpoint before-checkpoint-new-timeline"); - }); - - raw_timeline - .freeze_and_flush() - .await - .context("Failed to flush after basebackup import")?; - - // Initialize without loading the layer map. We started with an empty layer map, and already - // updated it for the layers that we created during the import. - let mut timelines = self.owning_tenant.timelines.lock().unwrap(); - let tl = self.initialize_with_lock(ctx, &mut timelines, false)?; - tl.activate(broker_client, ctx); - Ok(tl) - } - - fn raw_timeline(&self) -> anyhow::Result<&Arc> { - Ok(&self - .raw_timeline - .as_ref() - .with_context(|| { - format!( - "No raw timeline {}/{} found", - self.owning_tenant.tenant_id, self.timeline_id - ) - })? - .0) - } -} - -impl Drop for UninitializedTimeline<'_> { - fn drop(&mut self) { - if let Some((_, uninit_mark)) = self.raw_timeline.take() { - let _entered = info_span!("drop_uninitialized_timeline", tenant = %self.owning_tenant.tenant_id, timeline = %self.timeline_id).entered(); - error!("Timeline got dropped without initializing, cleaning its files"); - cleanup_timeline_directory(uninit_mark); - } - } -} - -fn cleanup_timeline_directory(uninit_mark: TimelineUninitMark) { +pub(crate) fn cleanup_timeline_directory(uninit_mark: TimelineUninitMark) { let timeline_path = &uninit_mark.timeline_path; match ignore_absent_files(|| fs::remove_dir_all(timeline_path)) { Ok(()) => { @@ -308,7 +182,7 @@ fn cleanup_timeline_directory(uninit_mark: TimelineUninitMark) { } impl TimelineUninitMark { - fn new(uninit_mark_path: PathBuf, timeline_path: PathBuf) -> Self { + pub(crate) fn new(uninit_mark_path: PathBuf, timeline_path: PathBuf) -> Self { Self { uninit_mark_deleted: false, uninit_mark_path, @@ -316,7 +190,7 @@ impl TimelineUninitMark { } } - fn remove_uninit_mark(mut self) -> anyhow::Result<()> { + pub(crate) fn remove_uninit_mark(mut self) -> anyhow::Result<()> { if !self.uninit_mark_deleted { self.delete_mark_file_if_present()?; } @@ -344,9 +218,10 @@ impl Drop for TimelineUninitMark { if !self.uninit_mark_deleted { if self.timeline_path.exists() { error!( - "Uninit mark {} is not removed, timeline {} stays uninitialized", + "Uninit mark {} is not removed, timeline {} stays uninitialized\n{}", self.uninit_mark_path.display(), - self.timeline_path.display() + self.timeline_path.display(), + std::backtrace::Backtrace::force_capture(), ) } else { // unblock later timeline creation attempts @@ -471,6 +346,33 @@ impl std::fmt::Display for WaitToBecomeActiveError { } } +#[derive(Clone)] +pub enum TimelineLoadCause { + Startup, + Attach, + TenantCreate, + TimelineCreate { + placeholder_timeline: Arc, + }, + TenantLoad, + #[cfg(test)] + Test, +} + +impl std::fmt::Debug for TimelineLoadCause { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + TimelineLoadCause::Startup => write!(f, "Startup"), + TimelineLoadCause::Attach => write!(f, "Attach"), + TimelineLoadCause::TenantCreate => write!(f, "TenantCreate"), + TimelineLoadCause::TimelineCreate { .. } => write!(f, "TimelineCreate"), + TimelineLoadCause::TenantLoad => write!(f, "TenantLoad"), + #[cfg(test)] + TimelineLoadCause::Test => write!(f, "Test"), + } + } +} + impl Tenant { /// Yet another helper for timeline initialization. /// Contains the common part of `load_local_timeline` and `load_remote_timeline`. @@ -486,10 +388,11 @@ impl Tenant { async fn timeline_init_and_sync( &self, timeline_id: TimelineId, - remote_client: Option, + remote_client: Option>, remote_startup_data: Option, local_metadata: Option, ancestor: Option>, + cause: TimelineLoadCause, first_save: bool, _ctx: &RequestContext, ) -> anyhow::Result<()> { @@ -522,10 +425,20 @@ impl Tenant { // avoiding holding it across awaits let mut timelines_accessor = self.timelines.lock().unwrap(); match timelines_accessor.entry(timeline_id) { - Entry::Occupied(_) => { - unreachable!( - "Timeline {tenant_id}/{timeline_id} already exists in the tenant map" - ); + Entry::Occupied(mut e) => { + match cause { + TimelineLoadCause::TenantCreate => unreachable!("tenant creates no timelines, so, we don't reach here"), + TimelineLoadCause::Startup | + TimelineLoadCause::TenantLoad | + TimelineLoadCause::Attach => unreachable!("when loading a full tenant, the loading entity is responsible for ensuring there are no duplicates, cause={cause:?}"), + TimelineLoadCause::TimelineCreate { placeholder_timeline } => { + assert!(Arc::ptr_eq(&e.get(), &placeholder_timeline), "when creating a timeline, the placeholder timeline should be the one in the map"); + e.insert(Arc::clone(&timeline)); + timeline + }, + #[cfg(test)] + TimelineLoadCause::Test => todo!(), + } } Entry::Vacant(v) => { v.insert(Arc::clone(&timeline)); @@ -814,13 +727,14 @@ impl Tenant { self.timeline_init_and_sync( timeline_id, - Some(remote_client), + Some(Arc::new(remote_client)), Some(RemoteStartupData { index_part, remote_metadata, }), local_metadata, ancestor, + TimelineLoadCause::Attach, true, ctx, ) @@ -858,12 +772,13 @@ impl Tenant { /// If the loading fails for some reason, the Tenant will go into Broken /// state. /// - #[instrument(skip(conf, remote_storage, ctx), fields(tenant_id=%tenant_id))] + #[instrument(skip_all, fields(tenant_id=%tenant_id))] pub fn spawn_load( conf: &'static PageServerConf, tenant_id: TenantId, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, + cause: TimelineLoadCause, ctx: &RequestContext, ) -> Arc { let tenant_conf = match Self::load_tenant_config(conf, tenant_id) { @@ -897,7 +812,7 @@ impl Tenant { "initial tenant load", false, async move { - match tenant_clone.load(&ctx).await { + match tenant_clone.load(cause, &ctx).await { Ok(()) => { info!("load finished, activating"); tenant_clone.activate(broker_client, &ctx); @@ -929,7 +844,11 @@ impl Tenant { /// files on disk. Used at pageserver startup. /// /// No background tasks are started as part of this routine. - async fn load(self: &Arc, ctx: &RequestContext) -> anyhow::Result<()> { + async fn load( + self: &Arc, + cause: TimelineLoadCause, + ctx: &RequestContext, + ) -> anyhow::Result<()> { debug_assert_current_span_has_tenant_id(); info!("loading tenant task"); @@ -944,95 +863,83 @@ impl Tenant { // collect a list of timelines and their ancestors. let mut timelines_to_load: HashMap = HashMap::new(); let timelines_dir = self.conf.timelines_path(&self.tenant_id); - for entry in std::fs::read_dir(&timelines_dir).with_context(|| { - format!( - "Failed to list timelines directory for tenant {}", - self.tenant_id - ) - })? { - let entry = entry.with_context(|| { - format!("cannot read timeline dir entry for {}", self.tenant_id) - })?; - let timeline_dir = entry.path(); + let entries: Vec = loop { + let mut entries = Vec::new(); + for entry in std::fs::read_dir(&timelines_dir).with_context(|| { + format!( + "Failed to list timelines directory for tenant {}", + self.tenant_id + ) + })? { + let entry = entry.with_context(|| { + format!("cannot read timeline dir entry for {}", self.tenant_id) + })?; + entries.push(entry); + } - if crate::is_temporary(&timeline_dir) { - info!( - "Found temporary timeline directory, removing: {}", - timeline_dir.display() - ); - if let Err(e) = std::fs::remove_dir_all(&timeline_dir) { - error!( - "Failed to remove temporary directory '{}': {:?}", - timeline_dir.display(), - e - ); - } - } else if is_uninit_mark(&timeline_dir) { - let timeline_uninit_mark_file = &timeline_dir; - info!( - "Found an uninit mark file {}, removing the timeline and its uninit mark", - timeline_uninit_mark_file.display() - ); - let timeline_id = timeline_uninit_mark_file - .file_stem() - .and_then(OsStr::to_str) - .unwrap_or_default() - .parse::() - .with_context(|| { - format!( - "Could not parse timeline id out of the timeline uninit mark name {}", - timeline_uninit_mark_file.display() - ) - })?; - let timeline_dir = self.conf.timeline_path(&timeline_id, &self.tenant_id); - if let Err(e) = - remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file) - { - error!("Failed to clean up uninit marked timeline: {e:?}"); - } - } else { - let timeline_id = timeline_dir - .file_name() - .and_then(OsStr::to_str) - .unwrap_or_default() - .parse::() - .with_context(|| { - format!( - "Could not parse timeline id out of the timeline dir name {}", - timeline_dir.display() - ) - })?; - let timeline_uninit_mark_file = self - .conf - .timeline_uninit_mark_file_path(self.tenant_id, timeline_id); - if timeline_uninit_mark_file.exists() { + let mut removed_unint_timeline = false; + for entry in &entries { + let timeline_dir = entry.path(); + if crate::is_temporary(&timeline_dir) { info!( - "Found an uninit mark file for timeline {}/{}, removing the timeline and its uninit mark", - self.tenant_id, timeline_id + "Found temporary timeline directory, removing: {}", + timeline_dir.display() ); - if let Err(e) = - remove_timeline_and_uninit_mark(&timeline_dir, &timeline_uninit_mark_file) - { - error!("Failed to clean up uninit marked timeline: {e:?}"); + if let Err(e) = std::fs::remove_dir_all(&timeline_dir) { + error!( + "Failed to remove temporary directory '{}': {:?}", + timeline_dir.display(), + e + ); } - continue; - } - - let file_name = entry.file_name(); - if let Ok(timeline_id) = - file_name.to_str().unwrap_or_default().parse::() - { - let metadata = load_metadata(self.conf, timeline_id, self.tenant_id) - .context("failed to load metadata")?; - timelines_to_load.insert(timeline_id, metadata); - } else { - // A file or directory that doesn't look like a timeline ID - warn!( - "unexpected file or directory in timelines directory: {}", - file_name.to_string_lossy() + } else if is_uninit_mark(&timeline_dir) { + let timeline_uninit_mark_file = &timeline_dir; + info!( + "Found an uninit mark file {}, removing the timeline and its uninit mark", + timeline_uninit_mark_file.display() ); + let timeline_id = timeline_uninit_mark_file + .file_stem() + .and_then(OsStr::to_str) + .unwrap_or_default() + .parse::() + .with_context(|| { + format!( + "Could not parse timeline id out of the timeline uninit mark name {}", + timeline_uninit_mark_file.display() + ) + })?; + let timeline_dir = self.conf.timeline_path(&timeline_id, &self.tenant_id); + remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file)?; + removed_unint_timeline = true; } } + + if removed_unint_timeline { + continue; + } + + break entries; + }; + + for entry in entries { + let timeline_dir = entry.path(); + assert!(!crate::is_temporary(&timeline_dir), "removed above"); + assert!(!is_uninit_mark(&timeline_dir), "removed above"); + let timeline_id = timeline_dir + .file_name() + .and_then(OsStr::to_str) + .unwrap_or_default() + .parse::() + .with_context(|| { + format!( + "Could not parse timeline id out of the timeline dir name {}", + timeline_dir.display() + ) + })?; + let metadata = load_metadata(self.conf, timeline_id, self.tenant_id) + .context("failed to load metadata")?; + timelines_to_load.insert(timeline_id, metadata); } // Sort the array of timeline IDs into tree-order, so that parent comes before @@ -1042,7 +949,7 @@ impl Tenant { // 1. "Timeline has no ancestor and no layer files" for (timeline_id, local_metadata) in sorted_timelines { - self.load_local_timeline(timeline_id, local_metadata, ctx) + self.load_local_timeline(timeline_id, local_metadata, cause.clone(), ctx) .await .with_context(|| format!("load local timeline {timeline_id}"))?; } @@ -1060,17 +967,18 @@ impl Tenant { &self, timeline_id: TimelineId, local_metadata: TimelineMetadata, + cause: TimelineLoadCause, ctx: &RequestContext, ) -> anyhow::Result<()> { debug_assert_current_span_has_tenant_id(); let remote_client = self.remote_storage.as_ref().map(|remote_storage| { - RemoteTimelineClient::new( + Arc::new(RemoteTimelineClient::new( remote_storage.clone(), self.conf, self.tenant_id, timeline_id, - ) + )) }); let remote_startup_data = match &remote_client { @@ -1128,6 +1036,7 @@ impl Tenant { remote_startup_data, Some(local_metadata), ancestor, + cause, false, ctx, ) @@ -1176,38 +1085,129 @@ impl Tenant { /// This is used to create the initial 'main' timeline during bootstrapping, /// or when importing a new base backup. The caller is expected to load an /// initial image of the datadir to the new timeline after this. - pub fn create_empty_timeline( + /// + /// Cancel-safety: not cancel safe. + /// + /// TODO: pull in latest changes from create_timeline() + pub(crate) async fn create_empty_timeline( &self, new_timeline_id: TimelineId, initdb_lsn: Lsn, pg_version: u32, - _ctx: &RequestContext, - ) -> anyhow::Result { + ctx: &RequestContext, + ) -> anyhow::Result<(TimelineUninitMark, Arc)> { anyhow::ensure!( self.is_active(), "Cannot create empty timelines on inactive tenant" ); + // TODO: dedup with create_timeline - let timelines = self.timelines.lock().unwrap(); - let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id, &timelines)?; - drop(timelines); + // Reserve the timeline id, locking out any other tasks that might try to create the timeline. + let placeholder_timeline: Arc = loop { + match self.timelines.lock().unwrap().entry(new_timeline_id) { + Entry::Occupied(_) => { + anyhow::bail!("timeline {new_timeline_id} already exists"); + } + Entry::Vacant(v) => { + let placeholder = self.new_timeline_creating_placeholder(new_timeline_id); + v.insert(Arc::clone(&placeholder)); + break placeholder; + } + } + }; - let new_metadata = TimelineMetadata::new( - Lsn(0), - None, - None, - Lsn(0), - initdb_lsn, - initdb_lsn, - pg_version, - ); - self.prepare_timeline( + let uninit_mark = self.create_timeline_uninit_mark(new_timeline_id)?; + + // Create timeline on-disk & remote state. + // + // Use an async block make sure we remove the uninit mark if the closure fails. + let create_ondisk_state = async { + let remote_client = self.remote_storage.as_ref().map(|remote_storage| { + Arc::new(RemoteTimelineClient::new( + remote_storage.clone(), + self.conf, + self.tenant_id, + new_timeline_id, + )) + }); + + let new_metadata = TimelineMetadata::new( + Lsn(0), // TODO should this be initdb_lsn as well, at least for the handle_import_basebackup use case? + None, + None, + Lsn(0), + initdb_lsn, + initdb_lsn, + pg_version, + ); + + self.create_timeline_files(&uninit_mark.timeline_path, new_timeline_id, &new_metadata) + .context("create_timeline_files")?; + + if let Some(remote_client) = remote_client.as_ref() { + remote_client.init_upload_queue_for_empty_remote(&new_metadata)?; + } + + // XXX do we need to remove uninit mark before starting uploads? + // If we die with uninit mark present, we'll leak the uploaded state in S3. + + if let Some(remote_client) = remote_client.as_ref() { + // The branch_timeline / bootstrap_timeline functions are responsible for initializing + // the the upload queue with the right metadata and scheduling an index upload. + // + // Here, we wait for those uploads to finish so that when we return + // Ok, the timeline is durable in remote storage. + remote_client + .wait_completion() + .await + .context("wait for initial uploads to complete")?; + } + Ok(()) + }; + match create_ondisk_state.await { + Ok(()) => {} + Err(err) => { + debug_assert_current_span_has_tenant_and_timeline_id(); + error!( + "failed to create on-disk state for new_timeline_id={new_timeline_id}: {err:#}" + ); + cleanup_timeline_directory(uninit_mark); + return Err(err); + } + } + + // From here on, it's just like during pageserver startup. + let metadata = load_metadata(self.conf, new_timeline_id, self.tenant_id) + .context("load newly created on-disk timeline metadata")?; + self.load_local_timeline( new_timeline_id, - &new_metadata, - timeline_uninit_mark, - true, - None, + metadata, + TimelineLoadCause::TimelineCreate { + placeholder_timeline: Arc::clone(&placeholder_timeline), + }, + ctx, ) + .await + .context("load newly created on-disk timeline state")?; + + let real_timeline = match self.timelines.lock().unwrap().entry(new_timeline_id) { + Entry::Vacant(_) => unreachable!("we created a placeholder earlier, and load_local_timeline should have inserted the real timeline"), + Entry::Occupied(entry) => { + assert_eq!(placeholder_timeline.current_state(), TimelineState::Creating); + assert!(!Arc::ptr_eq(&placeholder_timeline, entry.get()), "load_local_timeline should have replaced the placeholder with the real timeline"); + Arc::clone(entry.get()) + } + }; + + // Do not activate, the caller is responsible for that. + Ok((uninit_mark, real_timeline)) + + // TODO + // unfinished_timeline + // .layers + // .write() + // .unwrap() + // .next_open_layer_at = Some(initdb_lsn); } /// Helper for unit tests to create an emtpy timeline. @@ -1239,7 +1239,47 @@ impl Tenant { /// the same timeline ID already exists, returns None. If `new_timeline_id` is not given, /// a new unique ID is generated. pub async fn create_timeline( - &self, + self: &Arc, + new_timeline_id: TimelineId, + ancestor_timeline_id: Option, + ancestor_start_lsn: Option, + pg_version: u32, + broker_client: storage_broker::BrokerClientChannel, + ctx: &RequestContext, + ) -> anyhow::Result>> { + let ctx = ctx.detached_child(TaskKind::CreateTimeline, DownloadBehavior::Warn); + let (tx, rx) = tokio::sync::oneshot::channel(); + let self_clone = Arc::clone(self); + task_mgr::spawn( + &tokio::runtime::Handle::current(), + TaskKind::CreateTimeline, + Some(self.tenant_id), + None, // this is a tenant-level operation + "create timeline", + false, + async move { + let res = self_clone + .create_timeline_task( + new_timeline_id, + ancestor_timeline_id, + ancestor_start_lsn, + pg_version, + broker_client, + &ctx, + ) + .await; + let _ = tx.send(res); // receiver may get dropped due to request cancellation + Ok(()) + } + // may outlive caller if caller is cancelled, yet, it's useful to have caller's request id in the logs + .instrument(tracing::info_span!( "create_timeline", tenant_id=%self.tenant_id)), + ); + rx.await.expect("task_mgr tasks run to completion") + } + + /// This is not cancel-safe. Run inside a task_mgr task. + async fn create_timeline_task( + self: &Arc, new_timeline_id: TimelineId, ancestor_timeline_id: Option, mut ancestor_start_lsn: Option, @@ -1252,79 +1292,153 @@ impl Tenant { "Cannot create timelines on inactive tenant" ); - if let Ok(existing) = self.get_timeline(new_timeline_id, false) { - debug!("timeline {new_timeline_id} already exists"); - - if let Some(remote_client) = existing.remote_client.as_ref() { - // Wait for uploads to complete, so that when we return Ok, the timeline - // is known to be durable on remote storage. Just like we do at the end of - // this function, after we have created the timeline ourselves. - // - // We only really care that the initial version of `index_part.json` has - // been uploaded. That's enough to remember that the timeline - // exists. However, there is no function to wait specifically for that so - // we just wait for all in-progress uploads to finish. - remote_client - .wait_completion() - .await - .context("wait for timeline uploads to complete")?; + // Reserve the timeline id, locking out any other tasks that might try to create the timeline. + let placeholder_timeline: Arc = loop { + match self.timelines.lock().unwrap().entry(new_timeline_id) { + Entry::Occupied(_) => { + anyhow::bail!("timeline {new_timeline_id} already exists"); + } + Entry::Vacant(v) => { + let placeholder = self.new_timeline_creating_placeholder(new_timeline_id); + v.insert(Arc::clone(&placeholder)); + break placeholder; + } } + }; + let _placeholder_guard = scopeguard::guard(self, |self_clone| { + let Ok(mut timelines) = self_clone.timelines.lock() else { + error!("timelines lock poisoned, not removing placeholder timeline"); + return; + }; + match timelines.entry(new_timeline_id) { + Entry::Occupied(entry) => { + if Arc::ptr_eq(&placeholder_timeline, entry.get()) { + debug!("removing placeholder timeline"); + entry.remove(); + } else { + info!("placeholder timeline was replaced with another timeline, not removing it"); + return; + } + } + Entry::Vacant(_) => { + error!("either placeholder timeline or real timeline should be present in the timelines map"); + } + } + }); - return Ok(None); - } + let uninit_mark = self.create_timeline_uninit_mark(new_timeline_id)?; - let loaded_timeline = match ancestor_timeline_id { - Some(ancestor_timeline_id) => { - let ancestor_timeline = self - .get_timeline(ancestor_timeline_id, false) - .context("Cannot branch off the timeline that's not present in pageserver")?; + // Create timeline on-disk & remote state. + // + // Use an async block to make sure we remove the uninit mark if the closure fails. + let create_ondisk_state = async { + let remote_client = self.remote_storage.as_ref().map(|remote_storage| { + Arc::new(RemoteTimelineClient::new( + remote_storage.clone(), + self.conf, + self.tenant_id, + new_timeline_id, + )) + }); - if let Some(lsn) = ancestor_start_lsn.as_mut() { - *lsn = lsn.align(); + let _: () = match ancestor_timeline_id { + Some(ancestor_timeline_id) => { + let ancestor_timeline = + self.get_timeline(ancestor_timeline_id, false).context( + "Cannot branch off the timeline that's not present in pageserver", + )?; - let ancestor_ancestor_lsn = ancestor_timeline.get_ancestor_lsn(); - if ancestor_ancestor_lsn > *lsn { - // can we safely just branch from the ancestor instead? - bail!( - "invalid start lsn {} for ancestor timeline {}: less than timeline ancestor lsn {}", - lsn, - ancestor_timeline_id, - ancestor_ancestor_lsn, - ); + if let Some(lsn) = ancestor_start_lsn.as_mut() { + *lsn = lsn.align(); + + let ancestor_ancestor_lsn = ancestor_timeline.get_ancestor_lsn(); + if ancestor_ancestor_lsn > *lsn { + // can we safely just branch from the ancestor instead? + bail!( + "invalid start lsn {} for ancestor timeline {}: less than timeline ancestor lsn {}", + lsn, + ancestor_timeline_id, + ancestor_ancestor_lsn, + ); + } + + // Wait for the WAL to arrive and be processed on the parent branch up + // to the requested branch point. The repository code itself doesn't + // require it, but if we start to receive WAL on the new timeline, + // decoding the new WAL might need to look up previous pages, relation + // sizes etc. and that would get confused if the previous page versions + // are not in the repository yet. + ancestor_timeline.wait_lsn(*lsn, ctx).await?; } - // Wait for the WAL to arrive and be processed on the parent branch up - // to the requested branch point. The repository code itself doesn't - // require it, but if we start to receive WAL on the new timeline, - // decoding the new WAL might need to look up previous pages, relation - // sizes etc. and that would get confused if the previous page versions - // are not in the repository yet. - ancestor_timeline.wait_lsn(*lsn, ctx).await?; + self.branch_timeline( + &ancestor_timeline, + new_timeline_id, + ancestor_start_lsn, + remote_client, + &uninit_mark, + ctx, + ) + .await?; } + None => { + self.bootstrap_timeline( + new_timeline_id, + pg_version, + &uninit_mark, + remote_client, + ctx, + ) + .await?; + } + }; - self.branch_timeline(&ancestor_timeline, new_timeline_id, ancestor_start_lsn, ctx) - .await? + // XXX do we need to remove uninit mark before the self.branch_timeline / self.bootstrap_timeline start the uploads? + // If we die with uninit mark present, we'll leak the uploaded state in S3. + Ok(()) + }; + match create_ondisk_state.await { + Ok(()) => { + uninit_mark + .remove_uninit_mark() + .context("remove uninit mark")?; } - None => { - self.bootstrap_timeline(new_timeline_id, pg_version, ctx) - .await? + Err(err) => { + debug_assert_current_span_has_tenant_and_timeline_id(); + error!( + "failed to create on-disk state for new_timeline_id={new_timeline_id}: {err:#}" + ); + cleanup_timeline_directory(uninit_mark); + return Err(err); + } + } + + // From here on, it's just like during pageserver startup. + let metadata = load_metadata(self.conf, new_timeline_id, self.tenant_id) + .context("load newly created on-disk timeline metadata")?; + self.load_local_timeline( + new_timeline_id, + metadata, + TimelineLoadCause::TimelineCreate { + placeholder_timeline: Arc::clone(&placeholder_timeline), + }, + ctx, + ) + .await + .context("load newly created on-disk timeline state")?; + + let real_timeline = match self.timelines.lock().unwrap().entry(new_timeline_id) { + Entry::Vacant(_) => unreachable!("we created a placeholder earlier, and load_local_timeline should have inserted the real timeline"), + Entry::Occupied(entry) => { + assert_eq!(placeholder_timeline.current_state(), TimelineState::Creating); + assert!(!Arc::ptr_eq(&placeholder_timeline, entry.get()), "load_local_timeline should have replaced the placeholder with the real timeline"); + Arc::clone(entry.get()) } }; - loaded_timeline.activate(broker_client, ctx); + real_timeline.activate(broker_client, ctx); - if let Some(remote_client) = loaded_timeline.remote_client.as_ref() { - // Wait for the upload of the 'index_part.json` file to finish, so that when we return - // Ok, the timeline is durable in remote storage. - let kind = ancestor_timeline_id - .map(|_| "branched") - .unwrap_or("bootstrapped"); - remote_client.wait_completion().await.with_context(|| { - format!("wait for {} timeline initial uploads to complete", kind) - })?; - } - - Ok(Some(loaded_timeline)) + Ok(Some(real_timeline)) } /// perform one garbage collection iteration, removing old data files from disk. @@ -1408,7 +1522,13 @@ impl Tenant { }; for timeline in &timelines_to_flush { - timeline.freeze_and_flush().await?; + timeline.freeze_and_flush().await.with_context(|| { + format!( + "freeze_and_flush timeline {} (state={:?})", + timeline.timeline_id, + timeline.current_state() + ) + })?; } Ok(()) @@ -1959,7 +2079,7 @@ impl Tenant { new_timeline_id: TimelineId, new_metadata: &TimelineMetadata, ancestor: Option>, - remote_client: Option, + remote_client: Option>, ) -> anyhow::Result> { if let Some(ancestor_timeline_id) = new_metadata.ancestor_timeline() { anyhow::ensure!( @@ -1979,9 +2099,37 @@ impl Tenant { Arc::clone(&self.walredo_mgr), remote_client, pg_version, + false, )) } + fn new_timeline_creating_placeholder(&self, timeline_id: TimelineId) -> Arc { + // copied this from unit tests + let dummy_metadata = TimelineMetadata::new( + Lsn(0), + None, + None, + Lsn(0), + Lsn(0), + Lsn(0), + // Any version will do + // but it should be consistent with the one in the tests + crate::DEFAULT_PG_VERSION, + ); + Timeline::new( + self.conf, + Arc::clone(&self.tenant_conf), + &dummy_metadata, + None, + timeline_id, + self.tenant_id, + Arc::clone(&self.walredo_mgr), + None, + crate::DEFAULT_PG_VERSION, + true, + ) + } + fn new( state: TenantState, conf: &'static PageServerConf, @@ -2340,27 +2488,35 @@ impl Tenant { src_timeline: &Arc, dst_id: TimelineId, start_lsn: Option, + remote_client: Option, ctx: &RequestContext, ) -> anyhow::Result> { let tl = self - .branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx) + .branch_timeline_impl(src_timeline, dst_id, start_lsn, remote_client, ctx) .await?; tl.set_state(TimelineState::Active); Ok(tl) } - /// Branch an existing timeline. - /// - /// The caller is responsible for activating the returned timeline. + /// Branch an existing timeline, creating local and remote files. async fn branch_timeline( &self, src_timeline: &Arc, dst_id: TimelineId, start_lsn: Option, + remote_client: Option>, + uninit_mark: &TimelineUninitMark, ctx: &RequestContext, - ) -> anyhow::Result> { - self.branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx) - .await + ) -> anyhow::Result<()> { + self.branch_timeline_impl( + src_timeline, + dst_id, + start_lsn, + remote_client, + uninit_mark, + ctx, + ) + .await } async fn branch_timeline_impl( @@ -2368,8 +2524,10 @@ impl Tenant { src_timeline: &Arc, dst_id: TimelineId, start_lsn: Option, - ctx: &RequestContext, - ) -> anyhow::Result> { + remote_client: Option>, + uninit_mark: &TimelineUninitMark, + _ctx: &RequestContext, + ) -> anyhow::Result<()> { let src_id = src_timeline.timeline_id; // If no start LSN is specified, we branch the new timeline from the source timeline's last record LSN @@ -2384,13 +2542,6 @@ impl Tenant { // creating the branch. let _gc_cs = self.gc_cs.lock().await; - // Create a placeholder for the new branch. This will error - // out if the new timeline ID is already in use. - let timeline_uninit_mark = { - let timelines = self.timelines.lock().unwrap(); - self.create_timeline_uninit_mark(dst_id, &timelines)? - }; - // Ensure that `start_lsn` is valid, i.e. the LSN is within the PITR // horizon on the source timeline // @@ -2453,48 +2604,45 @@ impl Tenant { src_timeline.pg_version, ); - let new_timeline = { - let mut timelines = self.timelines.lock().unwrap(); - self.prepare_timeline( - dst_id, - &metadata, - timeline_uninit_mark, - false, - Some(Arc::clone(src_timeline)), - )? - .initialize_with_lock(ctx, &mut timelines, true)? - }; + self.create_timeline_files(&uninit_mark.timeline_path, dst_id, &metadata) + .context("create timeline files")?; // Root timeline gets its layers during creation and uploads them along with the metadata. // A branch timeline though, when created, can get no writes for some time, hence won't get any layers created. // We still need to upload its metadata eagerly: if other nodes `attach` the tenant and miss this timeline, their GC // could get incorrect information and remove more layers, than needed. // See also https://github.com/neondatabase/neon/issues/3865 - if let Some(remote_client) = new_timeline.remote_client.as_ref() { + if let Some(remote_client) = remote_client.as_ref() { + remote_client.init_upload_queue_for_empty_remote(&metadata)?; remote_client .schedule_index_upload_for_metadata_update(&metadata) .context("branch initial metadata upload")?; + remote_client + .wait_completion() + .await + .context("wait for initial uploads to complete")?; } + // XXX log message is a little too early, see caller for context info!("branched timeline {dst_id} from {src_id} at {start_lsn}"); - Ok(new_timeline) + Ok(()) } /// - run initdb to init temporary instance and get bootstrap data /// - after initialization complete, remove the temp dir. /// - /// The caller is responsible for activating the returned timeline. + /// This method takes ownership of the remote_client and finishes uploads itself. async fn bootstrap_timeline( &self, timeline_id: TimelineId, pg_version: u32, + uninit_mark: &TimelineUninitMark, + remote_client: Option>, ctx: &RequestContext, - ) -> anyhow::Result> { - let timeline_uninit_mark = { - let timelines = self.timelines.lock().unwrap(); - self.create_timeline_uninit_mark(timeline_id, &timelines)? - }; + ) -> anyhow::Result<()> { + let tenant_id = self.tenant_id; + // create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/` // temporary directory for basebackup files for the given timeline. let initdb_path = path_with_suffix_extension( @@ -2539,14 +2687,39 @@ impl Tenant { pgdata_lsn, pg_version, ); - let raw_timeline = - self.prepare_timeline(timeline_id, &new_metadata, timeline_uninit_mark, true, None)?; - let tenant_id = raw_timeline.owning_tenant.tenant_id; - let unfinished_timeline = raw_timeline.raw_timeline()?; + self.create_timeline_files(&uninit_mark.timeline_path, timeline_id, &new_metadata) + .context("create timeline files")?; + + if let Some(remote_client) = remote_client.as_ref() { + remote_client.init_upload_queue_for_empty_remote(&new_metadata)?; + // the freeze_and_flush below will schedule the metadata upload + } + + // Temporarily create a timeline object to allow the import to run in it. + + let remote_client_refcount_before = remote_client + .as_ref() + .map(|rc| (Arc::strong_count(rc), Arc::weak_count(rc))); + // Ensure the remote_client hasn't leaked into some global state. + // TODO: move ownership into the unfinished_timeline and back out. + scopeguard::defer!( + let remote_client_refcount_after = remote_client.as_ref().map(|rc| (Arc::strong_count(rc), Arc::weak_count(rc))); + assert_eq!(remote_client_refcount_before, remote_client_refcount_after, "the remote_client must not leak this function call graph") + ); + + let unfinished_timeline = self + .create_timeline_data(timeline_id, &new_metadata, None, remote_client.clone()) + .context("Failed to create timeline data structure")?; + + unfinished_timeline + .layers + .write() + .unwrap() + .next_open_layer_at = Some(pgdata_lsn); // pgdata_lsn == initdb_lsn import_datadir::import_timeline_from_postgres_datadir( - unfinished_timeline, + &*unfinished_timeline, pgdata_path, pgdata_lsn, ctx, @@ -2575,74 +2748,29 @@ impl Tenant { ) })?; - // Initialize the timeline without loading the layer map, because we already updated the layer - // map above, when we imported the datadir. - let timeline = { - let mut timelines = self.timelines.lock().unwrap(); - raw_timeline.initialize_with_lock(ctx, &mut timelines, false)? - }; + let last_record_lsn = unfinished_timeline.get_last_record_lsn(); + // Tear down the temporary timeline. + // XXX this should be a shared Timeline::shutdown method. + + if let Some(remote_client) = remote_client.as_ref() { + remote_client + .wait_completion() + .await + .context("wait for uploads to complete so we can stop the unfinished_timeline")?; + } + + // XXX this is same shutdown code as in Timeline::delete, share it. + unfinished_timeline.set_state(TimelineState::Stopping); + task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id)).await; + + // XXX log message is a little too early, see caller for context info!( "created root timeline {} timeline.lsn {}", - timeline_id, - timeline.get_last_record_lsn() + timeline_id, last_record_lsn, ); - Ok(timeline) - } - - /// Creates intermediate timeline structure and its files, without loading it into memory. - /// It's up to the caller to import the necesary data and import the timeline into memory. - fn prepare_timeline( - &self, - new_timeline_id: TimelineId, - new_metadata: &TimelineMetadata, - uninit_mark: TimelineUninitMark, - init_layers: bool, - ancestor: Option>, - ) -> anyhow::Result { - let tenant_id = self.tenant_id; - - let remote_client = if let Some(remote_storage) = self.remote_storage.as_ref() { - let remote_client = RemoteTimelineClient::new( - remote_storage.clone(), - self.conf, - tenant_id, - new_timeline_id, - ); - remote_client.init_upload_queue_for_empty_remote(new_metadata)?; - Some(remote_client) - } else { - None - }; - - match self.create_timeline_files( - &uninit_mark.timeline_path, - new_timeline_id, - new_metadata, - ancestor, - remote_client, - ) { - Ok(new_timeline) => { - if init_layers { - new_timeline.layers.write().unwrap().next_open_layer_at = - Some(new_timeline.initdb_lsn); - } - debug!( - "Successfully created initial files for timeline {tenant_id}/{new_timeline_id}" - ); - Ok(UninitializedTimeline { - owning_tenant: self, - timeline_id: new_timeline_id, - raw_timeline: Some((new_timeline, uninit_mark)), - }) - } - Err(e) => { - error!("Failed to create initial files for timeline {tenant_id}/{new_timeline_id}, cleaning up: {e:?}"); - cleanup_timeline_directory(uninit_mark); - Err(e) - } - } + Ok(()) } fn create_timeline_files( @@ -2650,12 +2778,7 @@ impl Tenant { timeline_path: &Path, new_timeline_id: TimelineId, new_metadata: &TimelineMetadata, - ancestor: Option>, - remote_client: Option, - ) -> anyhow::Result> { - let timeline_data = self - .create_timeline_data(new_timeline_id, new_metadata, ancestor, remote_client) - .context("Failed to create timeline data structure")?; + ) -> anyhow::Result<()> { crashsafe::create_dir_all(timeline_path).context("Failed to create timeline directory")?; fail::fail_point!("after-timeline-uninit-mark-creation", |_| { @@ -2671,7 +2794,7 @@ impl Tenant { ) .context("Failed to create timeline metadata")?; - Ok(timeline_data) + Ok(()) } /// Attempts to create an uninit mark file for the timeline initialization. @@ -2681,14 +2804,9 @@ impl Tenant { fn create_timeline_uninit_mark( &self, timeline_id: TimelineId, - timelines: &MutexGuard>>, ) -> anyhow::Result { let tenant_id = self.tenant_id; - anyhow::ensure!( - timelines.get(&timeline_id).is_none(), - "Timeline {tenant_id}/{timeline_id} already exists in pageserver's memory" - ); let timeline_path = self.conf.timeline_path(&timeline_id, &tenant_id); anyhow::ensure!( !timeline_path.exists(), @@ -2699,7 +2817,7 @@ impl Tenant { let uninit_mark_path = self .conf .timeline_uninit_mark_file_path(tenant_id, timeline_id); - fs::File::create(&uninit_mark_path) + fs::File::create(&uninit_mark_path) // XXX create_new .context("Failed to create uninit mark file") .and_then(|_| { crashsafe::fsync_file_and_parent(&uninit_mark_path) diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index d40468a5b3..81bf967669 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -19,7 +19,9 @@ use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; use crate::task_mgr::{self, TaskKind}; use crate::tenant::config::TenantConfOpt; -use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState}; +use crate::tenant::{ + create_tenant_files, CreateTenantFilesMode, Tenant, TenantState, TimelineLoadCause, +}; use crate::IGNORED_TENANT_FILE_NAME; use utils::fs_ext::PathExt; @@ -119,6 +121,7 @@ pub async fn init_tenant_mgr( &tenant_dir_path, broker_client.clone(), remote_storage.clone(), + TimelineLoadCause::Startup, &ctx, ) { Ok(tenant) => { @@ -154,6 +157,7 @@ pub fn schedule_local_tenant_processing( tenant_path: &Path, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, + cause: TimelineLoadCause, ctx: &RequestContext, ) -> anyhow::Result> { anyhow::ensure!( @@ -207,7 +211,7 @@ pub fn schedule_local_tenant_processing( } else { info!("tenant {tenant_id} is assumed to be loadable, starting load operation"); // Start loading the tenant into memory. It will initially be in Loading state. - Tenant::spawn_load(conf, tenant_id, broker_client, remote_storage, ctx) + Tenant::spawn_load(conf, tenant_id, broker_client, remote_storage, cause, ctx) }; Ok(tenant) } @@ -289,7 +293,7 @@ pub async fn create_tenant( // See https://github.com/neondatabase/neon/issues/4233 let created_tenant = - schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, ctx)?; + schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, TimelineLoadCause::TenantCreate, ctx)?; // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. // See https://github.com/neondatabase/neon/issues/4233 @@ -435,7 +439,7 @@ pub async fn load_tenant( .with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?; } - let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, ctx) + let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, TimelineLoadCause::TenantLoad, ctx) .with_context(|| { format!("Failed to schedule tenant processing in path {tenant_path:?}") })?; @@ -508,7 +512,7 @@ pub async fn attach_tenant( .context("check for attach marker file existence")?; anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file"); - let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), ctx)?; + let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), TimelineLoadCause::Attach ,ctx)?; // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. // See https://github.com/neondatabase/neon/issues/4233 diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d48c247c21..80ea44d26f 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -63,13 +63,13 @@ use utils::{ simple_rcu::{Rcu, RcuReadGuard}, }; -use crate::page_cache; use crate::repository::GcResult; use crate::repository::{Key, Value}; use crate::task_mgr::TaskKind; use crate::walredo::WalRedoManager; use crate::METADATA_FILE_NAME; use crate::ZERO_PAGE; +use crate::{import_datadir, page_cache}; use crate::{is_temporary, task_mgr}; pub(super) use self::eviction_task::EvictionTaskTenantState; @@ -664,12 +664,25 @@ impl Timeline { /// Flush to disk all data that was written with the put_* functions #[instrument(skip(self), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id))] pub async fn freeze_and_flush(&self) -> anyhow::Result<()> { + if self.current_state() == TimelineState::Creating { + debug!("timelines in Creating state are never written to"); + assert!( + self.layers.read().unwrap().open_layer.is_none(), + "would have nothing to flush anyways" + ); + return Ok(()); + } self.freeze_inmem_layer(false); self.flush_frozen_layers_and_wait().await } /// Outermost timeline compaction operation; downloads needed layers. pub async fn compact(&self, ctx: &RequestContext) -> anyhow::Result<()> { + if self.current_state() == TimelineState::Creating { + debug!("timelines is in Creating state"); + return Ok(()); + } + const ROUNDS: usize = 2; let last_record_lsn = self.get_last_record_lsn(); @@ -917,12 +930,30 @@ impl Timeline { } pub fn activate(self: &Arc, broker_client: BrokerClientChannel, ctx: &RequestContext) { + if self.current_state() == TimelineState::Creating { + panic!("timelines in Creating state are never activated"); + } + self.maybe_spawn_flush_loop(); self.launch_wal_receiver(ctx, broker_client); self.set_state(TimelineState::Active); self.launch_eviction_task(); } pub fn set_state(&self, new_state: TimelineState) { + if self.current_state() == TimelineState::Creating { + info!("timelines in Creating state are never activated, nothing to stop"); + assert_eq!( + *self.flush_loop_state.lock().unwrap(), + FlushLoopState::NotStarted + ); + assert!( + self.layers.read().unwrap().open_layer.is_none(), + "would have nothing to flush anyways" + ); + assert!(self.walreceiver.lock().unwrap().is_none()); + // TODO: assert other tasks launched in activate are not running + return; + } match (self.current_state(), new_state) { (equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => { warn!("Ignoring new state, equal to the existing one: {equal_state_2:?}"); @@ -962,6 +993,14 @@ impl Timeline { loop { let current_state = *receiver.borrow_and_update(); match current_state { + TimelineState::Creating => { + // A timeline _object_ in state Creating never transitions out of it. + // It gets replaced by another object in Loading state once creation is done. + // So, `self` is not the right object to subscribe to. + // Luckily, there's no code path that calls this function. + // But let's error out instead of an unreachable, just to be on the safe side. + return Err(current_state); + } TimelineState::Loading => { receiver .changed() @@ -1305,7 +1344,6 @@ impl Timeline { .change_threshold(&tenant_id_str, &timeline_id_str, new_threshold); } } - /// Open a Timeline handle. /// /// Loads the metadata for the timeline into memory, but not the layer map. @@ -1318,11 +1356,16 @@ impl Timeline { timeline_id: TimelineId, tenant_id: TenantId, walredo_mgr: Arc, - remote_client: Option, + remote_client: Option>, pg_version: u32, + is_create_placeholder: bool, ) -> Arc { let disk_consistent_lsn = metadata.disk_consistent_lsn(); - let (state, _) = watch::channel(TimelineState::Loading); + let (state, _) = watch::channel(if is_create_placeholder { + TimelineState::Creating + } else { + TimelineState::Loading + }); let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0); let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(()))); @@ -1350,7 +1393,7 @@ impl Timeline { walredo_mgr, walreceiver: Mutex::new(None), - remote_client: remote_client.map(Arc::new), + remote_client, // initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'. last_record_lsn: SeqWait::new(RecordLsn { @@ -1366,7 +1409,7 @@ impl Timeline { ancestor_lsn: metadata.ancestor_lsn(), metrics: TimelineMetrics::new( - false, + is_create_placeholder, &tenant_id, &timeline_id, crate::metrics::EvictionsWithLowResidenceDurationBuilder::new( @@ -1514,6 +1557,33 @@ impl Timeline { )); } + /// Prepares timeline data by loading it from the basebackup archive. + pub(crate) async fn import_basebackup_from_tar( + self: &Arc, + copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin), + base_lsn: Lsn, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + import_datadir::import_basebackup_from_tar(self, copyin_read, base_lsn, ctx) + .await + .context("Failed to import basebackup")?; + + // Flush loop needs to be spawned in order to be able to flush. + // We want to run proper checkpoint before we mark timeline as available to outside world + // Thus spawning flush loop manually and skipping flush_loop setup in initialize_with_lock + self.maybe_spawn_flush_loop(); + + fail::fail_point!("before-checkpoint-new-timeline", |_| { + bail!("failpoint before-checkpoint-new-timeline"); + }); + + self.freeze_and_flush() + .await + .context("Failed to flush after basebackup import")?; + + Ok(()) + } + /// /// Scan the timeline directory to populate the layer map. /// Returns all timeline-related files that were found and loaded. @@ -2016,6 +2086,12 @@ impl Timeline { ) -> Result { debug_assert_current_span_has_tenant_and_timeline_id(); + if self.current_state() == TimelineState::Creating { + return Err(CalculateLogicalSizeError::Other(anyhow!( + "cannot calculate logical size for timeline in Creating state" + ))); + } + let mut timeline_state_updates = self.subscribe_for_state_updates(); let self_calculation = Arc::clone(self); @@ -2036,7 +2112,8 @@ impl Timeline { TimelineState::Active => continue, TimelineState::Broken | TimelineState::Stopping - | TimelineState::Loading => { + | TimelineState::Loading + | TimelineState::Creating => { break format!("aborted because timeline became inactive (new state: {new_state:?})") } } @@ -3734,6 +3811,11 @@ impl Timeline { let now = SystemTime::now(); let mut result: GcResult = GcResult::default(); + if self.current_state() == TimelineState::Creating { + debug!("timeline creating placeholder does not need GC"); + return Ok(GcResult::default()); + } + // Nothing to GC. Return early. let latest_gc_cutoff = *self.get_latest_gc_cutoff_lsn(); if latest_gc_cutoff >= new_gc_cutoff { diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 6b65e1fd42..12a598cb84 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -148,6 +148,7 @@ pub(super) async fn connection_manager_loop_step( Ok(()) => { let new_state = connection_manager_state.timeline.current_state(); match new_state { + TimelineState::Creating => unreachable!("walreceiver should never be launched on a timeline in Creating state"), // we're already active as walreceiver, no need to reactivate TimelineState::Active => continue, TimelineState::Broken | TimelineState::Stopping => { diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index aefc8befeb..51a0caca0e 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -2,12 +2,11 @@ # env NEON_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/neon_zzz/'}" poetry ...... import os -import queue import shutil import threading import time from pathlib import Path -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Tuple import pytest from fixtures.log_helper import log @@ -656,21 +655,26 @@ def test_empty_branch_remote_storage_upload( @pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS]) -def test_empty_branch_remote_storage_upload_on_restart( +def test_empty_branch_remote_storage_upload_failure( neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind, ): """ - Branches off a root branch, but does not write anything to the new branch, so - it has a metadata file only. + Branching is not acknowledged until the index_part.json is uploaded. - Ensures the branch is not on the remote storage and restarts the pageserver - — the upload should be scheduled by load, and create_timeline should await - for it even though it gets 409 Conflict. + Fails the index_part.json upload with a failpoint. + Ensures that timeline creation fails because of that. + Stops the pageserver. + Restarts it, still with failpoint enabled. + Waits for tenant to finish loading. + Ensures the timeline does not exist locally nor remotely. + + Disables the failpoint. + Ensures that timeline can be created. """ neon_env_builder.enable_remote_storage( remote_storage_kind=remote_storage_kind, - test_name="test_empty_branch_remote_storage_upload_on_restart", + test_name="test_empty_branch_remote_storage_upload_failures", ) env = neon_env_builder.init_start() @@ -696,12 +700,21 @@ def test_empty_branch_remote_storage_upload_on_restart( # index upload is now hitting the failpoint, should not block the shutdown env.pageserver.stop() + env.pageserver.allowed_errors.append( + f".*failed to create on-disk state for new_timeline_id={new_branch_timeline_id}.*wait for initial uploads to complete.*upload queue was stopped" + ) + timeline_path = ( Path("tenants") / str(env.initial_tenant) / "timelines" / str(new_branch_timeline_id) ) + uninit_marker_path = env.repo_dir / timeline_path.with_suffix(".___uninit") - local_metadata = env.repo_dir / timeline_path / "metadata" - assert local_metadata.is_file(), "timeout cancelled timeline branching, not the upload" + assert ( + not uninit_marker_path.exists() + ), "uninit marker should be deleted during orderly shutdown" + assert not ( + env.repo_dir / timeline_path + ).exists(), "unfinished timeline dir should be deleted during orderly shutdown" assert isinstance(env.remote_storage, LocalFsStorage) new_branch_on_remote_storage = env.remote_storage.root / timeline_path @@ -709,54 +722,26 @@ def test_empty_branch_remote_storage_upload_on_restart( not new_branch_on_remote_storage.exists() ), "failpoint should had prohibited index_part.json upload" - # during reconciliation we should had scheduled the uploads and on the - # retried create_timeline, we will await for those to complete on next - # client.timeline_create - env.pageserver.start(extra_env_vars={"FAILPOINTS": "before-upload-index=return"}) + # restart without failpoint + env.pageserver.start() - # sleep a bit to force the upload task go into exponential backoff - time.sleep(1) + wait_until_tenant_state(client, env.initial_tenant, "Active", 5) - q: queue.Queue[Optional[PageserverApiException]] = queue.Queue() - barrier = threading.Barrier(2) + # retry creation + client.timeline_create( + tenant_id=env.initial_tenant, + ancestor_timeline_id=env.initial_timeline, + new_timeline_id=new_branch_timeline_id, + pg_version=env.pg_version, + ) - def create_in_background(): - barrier.wait() - try: - client.timeline_create( - tenant_id=env.initial_tenant, - ancestor_timeline_id=env.initial_timeline, - new_timeline_id=new_branch_timeline_id, - pg_version=env.pg_version, - ) - q.put(None) - except PageserverApiException as e: - q.put(e) + assert_nothing_to_upload(client, env.initial_tenant, new_branch_timeline_id) - create_thread = threading.Thread(target=create_in_background) - create_thread.start() - - try: - # maximize chances of actually waiting for the uploads by create_timeline - barrier.wait() - - assert not new_branch_on_remote_storage.exists(), "failpoint should had stopped uploading" - - client.configure_failpoints(("before-upload-index", "off")) - conflict = q.get() - - assert conflict, "create_timeline should not have succeeded" - assert ( - conflict.status_code == 409 - ), "timeline was created before restart, and uploads scheduled during initial load, so we expect 409 conflict" - - assert_nothing_to_upload(client, env.initial_tenant, new_branch_timeline_id) - - assert ( - new_branch_on_remote_storage / "index_part.json" - ).is_file(), "uploads scheduled during initial load should had been awaited for" - finally: - create_thread.join() + assert (env.repo_dir / timeline_path).exists() + assert not uninit_marker_path.exists() + assert ( + new_branch_on_remote_storage / "index_part.json" + ).is_file(), "uploads scheduled during initial load should had been awaited for" def wait_upload_queue_empty(