diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 64f1236f27..050d3f642b 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -14,6 +14,7 @@ use bytes::Buf; use bytes::Bytes; use futures::Stream; use pageserver_api::models::TenantState; +use pageserver_api::models::TimelineState; use pageserver_api::models::{ PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse, PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse, @@ -24,6 +25,7 @@ use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, Qu use pq_proto::framed::ConnectionError; use pq_proto::FeStartupPacket; use pq_proto::{BeMessage, FeMessage, RowDescriptor}; +use std::collections::hash_map::Entry; use std::io; use std::net::TcpListener; use std::pin::pin; @@ -51,6 +53,7 @@ use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME}; use crate::task_mgr; use crate::task_mgr::TaskKind; use crate::tenant; +use crate::tenant::compare_arced_timeline; use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::mgr; use crate::tenant::mgr::GetTenantError; @@ -491,10 +494,14 @@ impl PageServerHandler { info!("creating new timeline"); let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?; - let (guard, timeline) = tenant + let (guard, real_timeline_not_in_tenants_map) = tenant .create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx) .await?; + // TODO spawn flush loop of timeline early (before activation), + // but then we need to take care of shutting it down in case we fail + // (bootstrap_timeline probably also needs it?) + // TODO mark timeline as not ready until it reaches end_lsn. // We might have some wal to import as well, and we should prevent compute // from connecting before that and writing conflicting wal. @@ -512,7 +519,7 @@ impl PageServerHandler { pgb.flush().await?; let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb))); - timeline + real_timeline_not_in_tenants_map .import_basebackup_from_tar(&mut copyin_reader, base_lsn, &ctx) .await?; @@ -520,14 +527,10 @@ impl PageServerHandler { read_tar_eof(copyin_reader).await?; anyhow::Ok(()) }; - match doit.await { + let placeholder_timeline = match doit.await { Ok(()) => { match guard.creation_complete_remove_uninit_marker_and_get_placeholder_timeline() { - Ok(placeholder_timeline) => { - // create_empty_timeline already replaced the placeholder timeline with the real one. - // However, we still need to remove the placeholder. - let _ = placeholder_timeline; // don't need it anymore - } + Ok(placeholder_timeline) => placeholder_timeline, Err(err) => { error!( "failed to remove uninit marker for new_timeline_id={timeline_id}: {err:#}" @@ -538,10 +541,21 @@ impl PageServerHandler { } Err(e) => { debug_assert_current_span_has_tenant_and_timeline_id(); - error!("error importing basebackup: {:?}", e); guard.creation_failed(); return Err(QueryError::Other(e)); } + }; + + // todo share with Tenant::create_timeline + match tenant.timelines.lock().unwrap().entry(timeline_id) { + Entry::Vacant(_) => unreachable!("we created a placeholder earlier, and load_local_timeline should have inserted the real timeline"), + Entry::Occupied(mut o) => { + info!("replacing placeholder timeline with the real one"); + assert_eq!(placeholder_timeline.current_state(), TimelineState::Creating); + assert!(compare_arced_timeline(&placeholder_timeline, o.get())); + let replaced_placeholder = o.insert(Arc::clone(&real_timeline_not_in_tenants_map)); + assert!(compare_arced_timeline(&replaced_placeholder, &placeholder_timeline)); + }, } // TODO check checksum @@ -551,7 +565,7 @@ impl PageServerHandler { // since we discard some log files. info!("done, activating timeline"); - timeline.activate(self.broker_client.clone(), &ctx); + real_timeline_not_in_tenants_map.activate(self.broker_client.clone(), &ctx); Ok(()) } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index e698c9ab5e..e718454628 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -136,7 +136,7 @@ pub struct Tenant { tenant_conf: Arc>, tenant_id: TenantId, - timelines: Mutex>>, + pub(super) timelines: Mutex>>, // This mutex prevents creation of new timelines during GC. // Adding yet another mutex (in addition to `timelines`) is needed because holding // `timelines` mutex during all GC iteration @@ -158,7 +158,7 @@ pub struct Tenant { /// Similar to `Arc::ptr_eq`, but only compares the object pointers, not vtables. #[inline(always)] -fn compare_arced_timeline(left: &Arc, right: &Arc) -> bool { +pub(crate) fn compare_arced_timeline(left: &Arc, right: &Arc) -> bool { // See: https://github.com/rust-lang/rust/issues/103763 // See: https://github.com/rust-lang/rust/pull/106450 let left = Arc::as_ptr(left) as *const (); @@ -307,6 +307,18 @@ impl<'t> CreatingTimelineGuard<'t> { } } +/// Newtype to avoid conusing local variables that are both Arc +struct AncestorArg(Option>); + +impl AncestorArg { + pub fn ancestor(ancestor: Arc) -> Self { + Self(Some(ancestor)) + } + pub fn no_ancestor() -> Self { + Self(None) + } +} + // We should not blindly overwrite local metadata with remote one. // For example, consider the following case: // Image layer is flushed to disk as a new delta layer, we update local metadata and start upload task but after that @@ -455,18 +467,49 @@ impl Tenant { /// If the operation fails, the timeline is left in the tenant's hash map in Broken state. On success, /// it is marked as Active. #[allow(clippy::too_many_arguments)] + #[instrument(skip_all, fields(?cause))] async fn timeline_init_and_sync( &self, timeline_id: TimelineId, remote_client: Option>, remote_startup_data: Option, local_metadata: Option, - ancestor: Option>, + ancestor: AncestorArg, cause: TimelineLoadCause, first_save: bool, // TODO need to think about this _ctx: &RequestContext, ) -> anyhow::Result> { + debug_assert_current_span_has_tenant_and_timeline_id(); + let tenant_id = self.tenant_id; + let ancestor = ancestor.0; + + match ( + local_metadata.as_ref().map(|lmd| lmd.ancestor_timeline()), + remote_startup_data + .as_ref() + .map(|rsd| rsd.remote_metadata.ancestor_timeline()), + ) { + (Some(local_ancestor), Some(remote_ancestor)) => { + assert_eq!( + local_ancestor, remote_ancestor, + "local and remote ancestor timelines should match" + ); + } + (None, None) => {} + (local, remote) => { + anyhow::bail!("local and remote metadata do not agree on ancestorship, local={local:?} remote={remote:?}"); + } + } + assert_eq!( + ancestor.as_ref().map(|a| a.timeline_id), + // we could check either local or remote metadata, it doesn't matter, + // we checked above that they're either (None, None) or (Some, Some) + local_metadata + .as_ref() + .and_then(|lmd| lmd.ancestor_timeline()), + "caller does not provide correct ancestor" + ); let (up_to_date_metadata, picked_local) = merge_local_remote_metadata( local_metadata.as_ref(), @@ -475,6 +518,12 @@ impl Tenant { .context("merge_local_remote_metadata")? .to_owned(); + assert_eq!( + up_to_date_metadata.ancestor_timeline(), + ancestor.as_ref().map(|a| a.timeline_id), + "merge_local_remote_metadata should not change ancestor" + ); + let timeline = { let timeline = self.create_timeline_data( timeline_id, @@ -492,31 +541,7 @@ impl Tenant { format!("Failed to load layermap for timeline {tenant_id}/{timeline_id}") })?; - // avoiding holding it across awaits - let mut timelines_accessor = self.timelines.lock().unwrap(); - match timelines_accessor.entry(timeline_id) { - Entry::Occupied(mut e) => { - match cause { - TimelineLoadCause::TenantCreate => unreachable!("tenant creates no timelines, so, we don't reach here"), - TimelineLoadCause::Startup | - TimelineLoadCause::TenantLoad | - TimelineLoadCause::Attach => unreachable!("when loading a full tenant, the loading entity is responsible for ensuring there are no duplicates, cause={cause:?}"), - TimelineLoadCause::TimelineCreate { placeholder_timeline } => { - // replace placeholder with real one - assert!(compare_arced_timeline(e.get(), &placeholder_timeline), "when creating a timeline, the placeholder timeline should be the one in the map"); - e.insert(Arc::clone(&timeline)); - timeline - }, - #[cfg(test)] - TimelineLoadCause::Test => todo!(), - } - } - Entry::Vacant(v) => { - v.insert(Arc::clone(&timeline)); - timeline.maybe_spawn_flush_loop(); - timeline - } - } + timeline }; if self.remote_storage.is_some() { @@ -724,7 +749,27 @@ impl Tenant { .expect("just put it in above"); // TODO again handle early failure - self.load_remote_timeline(timeline_id, index_part, remote_metadata, remote_client, ctx) + let ancestor = if let Some(ancestor_id) = remote_metadata.ancestor_timeline() { + let timelines = self.timelines.lock().unwrap(); + AncestorArg::ancestor(Arc::clone(timelines.get(&ancestor_id).ok_or_else( + || { + anyhow::anyhow!( + "cannot find ancestor timeline {ancestor_id} for timeline {timeline_id}" + ) + }, + )?)) + } else { + AncestorArg::no_ancestor() + }; + let timeline = self + .load_remote_timeline( + timeline_id, + index_part, + remote_metadata, + ancestor, + remote_client, + ctx, + ) .await .with_context(|| { format!( @@ -732,6 +777,16 @@ impl Tenant { timeline_id, self.tenant_id ) })?; + // TODO: why can't load_remote_timeline return None like load_local_timeline does? + + let mut timelines = self.timelines.lock().unwrap(); + let overwritten = timelines.insert(timeline_id, Arc::clone(&timeline)); + if let Some(overwritten) = overwritten { + panic!( + "timeline should not be in the map yet, but is: {timeline_id}: {:?}", + overwritten.current_state() + ); + } } std::fs::remove_file(&marker_file) @@ -768,9 +823,10 @@ impl Tenant { timeline_id: TimelineId, index_part: IndexPart, remote_metadata: TimelineMetadata, + ancestor: AncestorArg, remote_client: RemoteTimelineClient, ctx: &RequestContext, - ) -> anyhow::Result<()> { + ) -> anyhow::Result> { debug_assert_current_span_has_tenant_id(); info!("downloading index file for timeline {}", timeline_id); @@ -778,19 +834,6 @@ impl Tenant { .await .context("Failed to create new timeline directory")?; - let ancestor = if let Some(ancestor_id) = remote_metadata.ancestor_timeline() { - let timelines = self.timelines.lock().unwrap(); - Some(Arc::clone(timelines.get(&ancestor_id).ok_or_else( - || { - anyhow::anyhow!( - "cannot find ancestor timeline {ancestor_id} for timeline {timeline_id}" - ) - }, - )?)) - } else { - None - }; - // Even if there is local metadata it cannot be ahead of the remote one // since we're attaching. Even if we resume interrupted attach remote one // cannot be older than the local one @@ -809,8 +852,7 @@ impl Tenant { true, ctx, ) - .await?; - Ok(()) + .await } /// Create a placeholder Tenant object for a broken tenant @@ -1021,9 +1063,38 @@ impl Tenant { // 1. "Timeline has no ancestor and no layer files" for (timeline_id, local_metadata) in sorted_timelines { - self.load_local_timeline(timeline_id, local_metadata, cause.clone(), ctx) + let ancestor = if let Some(ancestor_id) = local_metadata.ancestor_timeline() { + let timelines = self.timelines.lock().unwrap(); + AncestorArg::ancestor(Arc::clone(timelines.get(&ancestor_id).ok_or_else( + || { + anyhow::anyhow!( + "cannot find ancestor timeline {ancestor_id} for timeline {timeline_id}" + ) + }, + )?)) + } else { + AncestorArg::no_ancestor() + }; + let timeline = self + .load_local_timeline(timeline_id, local_metadata, ancestor, cause.clone(), ctx) .await .with_context(|| format!("load local timeline {timeline_id}"))?; + match timeline { + Some(loaded_timeline) => { + let mut timelines = self.timelines.lock().unwrap(); + let overwritten = timelines.insert(timeline_id, Arc::clone(&loaded_timeline)); + if let Some(overwritten) = overwritten { + panic!( + "timeline should not be in the map yet, but is: {timeline_id}: {:?}", + overwritten.current_state() + ); + } + } + None => { + info!(%timeline_id, "timeline is marked as deleted on the remote, load_local_timeline finished the deletion locally"); + // TODO don't we need to restart the tree sort? + } + } } info!("Done"); @@ -1039,6 +1110,7 @@ impl Tenant { &self, timeline_id: TimelineId, local_metadata: TimelineMetadata, + ancestor: AncestorArg, cause: TimelineLoadCause, ctx: &RequestContext, ) -> anyhow::Result>> { @@ -1094,14 +1166,6 @@ impl Tenant { None => None, }; - let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() { - let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false) - .with_context(|| anyhow::anyhow!("cannot find ancestor timeline {ancestor_timeline_id} for timeline {timeline_id}"))?; - Some(ancestor_timeline) - } else { - None - }; - let inserted_timeline = self .timeline_init_and_sync( timeline_id, @@ -1241,6 +1305,7 @@ impl Tenant { .load_local_timeline( new_timeline_id, metadata, + AncestorArg::no_ancestor(), TimelineLoadCause::TimelineCreate { placeholder_timeline: Arc::clone(&guard.placeholder_timeline), }, @@ -1250,16 +1315,20 @@ impl Tenant { .context("load newly created on-disk timeline state")? .expect("load_local_timeline should have created the timeline"); - let real_timeline = match self.timelines.lock().unwrap().entry(new_timeline_id) { - Entry::Vacant(_) => unreachable!("we created a placeholder earlier, and load_local_timeline should have inserted the real timeline"), - Entry::Occupied(entry) => { - assert_eq!(guard.placeholder_timeline.current_state(), TimelineState::Creating); - assert!(compare_arced_timeline(&real_timeline, entry.get())); - assert_eq!(real_timeline.current_state(), TimelineState::Loading); - assert!(!compare_arced_timeline(&guard.placeholder_timeline, entry.get()), "load_local_timeline should have replaced the placeholder with the real timeline"); - Arc::clone(entry.get()) - } - }; + // don't replace the placeholder timeline, the caller is going to fill + // real_timeline with more data and once that's done, we're ready to + // replace the placeholder + + // let real_timeline = match self.timelines.lock().unwrap().entry(new_timeline_id) { + // Entry::Vacant(_) => unreachable!("we created a placeholder earlier, and load_local_timeline should have inserted the real timeline"), + // Entry::Occupied(entry) => { + // assert_eq!(guard.placeholder_timeline.current_state(), TimelineState::Creating); + // assert!(compare_arced_timeline(&real_timeline, entry.get())); + // assert_eq!(real_timeline.current_state(), TimelineState::Loading); + // assert!(!compare_arced_timeline(&guard.placeholder_timeline, entry.get()), "load_local_timeline should have replaced the placeholder with the real timeline"); + // Arc::clone(entry.get()) + // } + // }; // Do not activate, the caller is responsible for that. // Also, the caller is still responsible for removing the uninit mark file. @@ -1422,6 +1491,7 @@ impl Tenant { ctx, ) .await?; + Ok(AncestorArg::ancestor(ancestor_timeline)) } None => { self.bootstrap_timeline( @@ -1432,17 +1502,16 @@ impl Tenant { ctx, ) .await?; + Ok(AncestorArg::no_ancestor()) } - }; - + } // XXX do we need to remove uninit mark before the self.branch_timeline / self.bootstrap_timeline start the uploads? // If we die with uninit mark present, we'll leak the uploaded state in S3. - Ok(()) }; - let placeholder_timeline = match create_ondisk_state.await { - Ok(()) => { + let (placeholder_timeline, ancestor) = match create_ondisk_state.await { + Ok(ancestor) => { match guard.creation_complete_remove_uninit_marker_and_get_placeholder_timeline() { - Ok(placeholder_timeline) => placeholder_timeline, + Ok(placeholder_timeline) => (placeholder_timeline, ancestor), Err(err) => { error!( "failed to remove uninit marker for new_timeline_id={new_timeline_id}: {err:#}" @@ -1463,26 +1532,35 @@ impl Tenant { // From here on, it's just like during pageserver startup. let metadata = load_metadata(self.conf, new_timeline_id, self.tenant_id) .context("load newly created on-disk timeline metadata")?; - self.load_local_timeline( - new_timeline_id, - metadata, - TimelineLoadCause::TimelineCreate { - placeholder_timeline: Arc::clone(&placeholder_timeline), - }, - ctx, - ) - .await - .context("load newly created on-disk timeline state")?; - let real_timeline = match self.timelines.lock().unwrap().entry(new_timeline_id) { - Entry::Vacant(_) => unreachable!("we created a placeholder earlier, and load_local_timeline should have inserted the real timeline"), - Entry::Occupied(entry) => { - assert_eq!(placeholder_timeline.current_state(), TimelineState::Creating); - assert!(!compare_arced_timeline(&placeholder_timeline, entry.get()), "load_local_timeline should have replaced the placeholder with the real timeline"); - Arc::clone(entry.get()) - } + let real_timeline = self + .load_local_timeline( + new_timeline_id, + metadata, + ancestor, + TimelineLoadCause::TimelineCreate { + placeholder_timeline: Arc::clone(&placeholder_timeline), + }, + ctx, + ) + .await + .context("load newly created on-disk timeline state")?; + + let Some(real_timeline) = real_timeline else { + anyhow::bail!("we just created this timeline's local files, but load_local_timeline did not load it"); }; + match self.timelines.lock().unwrap().entry(new_timeline_id) { + Entry::Vacant(_) => unreachable!("we created a placeholder earlier, and load_local_timeline should have inserted the real timeline"), + Entry::Occupied(mut o) => { + info!("replacing placeholder timeline with the real one"); + assert_eq!(placeholder_timeline.current_state(), TimelineState::Creating); + assert!(compare_arced_timeline(&placeholder_timeline, o.get())); + let replaced_placeholder = o.insert(Arc::clone(&real_timeline)); + assert!(compare_arced_timeline(&replaced_placeholder, &placeholder_timeline)); + }, + } + real_timeline.activate(broker_client, ctx); Ok(Some(real_timeline)) @@ -2658,19 +2736,21 @@ impl Tenant { // From here on, it's just like during pageserver startup. let metadata = load_metadata(self.conf, dst_id, self.tenant_id) .context("load newly created on-disk timeline metadata")?; - self.load_local_timeline(dst_id, metadata, TimelineLoadCause::Test, ctx) + + let tline = self + .load_local_timeline( + dst_id, + metadata, + AncestorArg::no_ancestor(), + TimelineLoadCause::Test, + ctx, + ) .instrument(info_span!("load_local_timeline", timeline_id=%dst_id)) .await - .context("load newly created on-disk timeline state")?; + .context("load newly created on-disk timeline state")? + .unwrap(); - let loaded_timeline = self - .timelines - .lock() - .unwrap() - .get(&dst_id) - .cloned() - .expect("we just loaded it"); - loaded_timeline.set_state(TimelineState::Active); + tline.set_state(TimelineState::Active); Ok(loaded_timeline) } diff --git a/test_runner/regress/test_import.py b/test_runner/regress/test_import.py index bbc7f68262..05448b8d46 100644 --- a/test_runner/regress/test_import.py +++ b/test_runner/regress/test_import.py @@ -128,29 +128,24 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build ) # Importing empty file fails + log.info("importing empty_file") empty_file = os.path.join(test_output_dir, "empty_file") with open(empty_file, "w") as _: with pytest.raises(Exception): import_tar(empty_file, empty_file) - # yet, timeline is created and needs to be removed - log.info("deleting timeline") - client.timeline_delete(tenant, timeline) - - log.info("importing corrupt_base_tar") + assert timeline not in {TimelineId(t["timeline_id"]) for t in client.timeline_list(tenant)} # Importing corrupt backup fails + log.info("importing corrupt_base_tar") with pytest.raises(Exception): import_tar(corrupt_base_tar, wal_tar) - # yet, timeline is created and needs to be removed - log.info("deleting timeline") - client.timeline_delete(tenant, timeline) - - log.info("importing base_plus_garbage_tar") + assert timeline not in {TimelineId(t["timeline_id"]) for t in client.timeline_list(tenant)} # A tar with trailing garbage is currently accepted. It prints a warnings # to the pageserver log, however. Check that. + log.info("importing base_plus_garbage_tar") import_tar(base_plus_garbage_tar, wal_tar) assert env.pageserver.log_contains( ".*WARN.*ignored .* unexpected bytes after the tar archive.*"