From 2db20e55871fdbbe38c2ae7a28b0692a67be4838 Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Thu, 1 Sep 2022 16:22:22 +0300 Subject: [PATCH] Remove [Un]Loaded timeline code (#2359) --- pageserver/src/http/models.rs | 2 - pageserver/src/http/routes.rs | 59 +-- pageserver/src/layered_repository.rs | 392 +++++++++--------- pageserver/src/layered_repository/timeline.rs | 111 +---- pageserver/src/page_service.rs | 69 +-- pageserver/src/repository.rs | 26 -- pageserver/src/storage_sync.rs | 17 +- pageserver/src/tenant_mgr.rs | 60 +-- pageserver/src/timelines.rs | 17 +- .../src/walreceiver/walreceiver_connection.rs | 2 +- test_runner/regress/test_broken_timeline.py | 6 +- test_runner/regress/test_pageserver_api.py | 5 +- 12 files changed, 290 insertions(+), 476 deletions(-) diff --git a/pageserver/src/http/models.rs b/pageserver/src/http/models.rs index 654f45a95d..7c7d7f7b0c 100644 --- a/pageserver/src/http/models.rs +++ b/pageserver/src/http/models.rs @@ -8,7 +8,6 @@ use utils::{ }; // These enums are used in the API response fields. -use crate::repository::LocalTimelineState; use crate::tenant_mgr::TenantState; #[serde_as] @@ -133,7 +132,6 @@ pub struct LocalTimelineInfo { pub current_physical_size: Option, // is None when timeline is Unloaded pub current_logical_size_non_incremental: Option, pub current_physical_size_non_incremental: Option, - pub timeline_state: LocalTimelineState, pub wal_source_connstr: Option, #[serde_as(as = "Option")] diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 710014de98..f1033eeb2a 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -11,8 +11,7 @@ use super::models::{ StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo, TimelineCreateRequest, }; -use crate::layered_repository::{metadata::TimelineMetadata, Timeline}; -use crate::repository::{LocalTimelineState, RepositoryTimeline}; +use crate::layered_repository::Timeline; use crate::storage_sync; use crate::storage_sync::index::{RemoteIndex, RemoteTimeline}; use crate::tenant_config::TenantConfOpt; @@ -74,7 +73,7 @@ fn get_config(request: &Request) -> &'static PageServerConf { // Helper functions to construct a LocalTimelineInfo struct for a timeline -fn local_timeline_info_from_loaded_timeline( +fn local_timeline_info_from_timeline( timeline: &Arc, include_non_incremental_logical_size: bool, include_non_incremental_physical_size: bool, @@ -105,7 +104,6 @@ fn local_timeline_info_from_loaded_timeline( last_record_lsn, prev_record_lsn: Some(timeline.get_prev_record_lsn()), latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(), - timeline_state: LocalTimelineState::Loaded, current_logical_size: Some( timeline .get_current_logical_size() @@ -129,61 +127,20 @@ fn local_timeline_info_from_loaded_timeline( Ok(info) } -fn local_timeline_info_from_unloaded_timeline(metadata: &TimelineMetadata) -> LocalTimelineInfo { - LocalTimelineInfo { - ancestor_timeline_id: metadata.ancestor_timeline(), - ancestor_lsn: { - match metadata.ancestor_lsn() { - Lsn(0) => None, - lsn @ Lsn(_) => Some(lsn), - } - }, - disk_consistent_lsn: metadata.disk_consistent_lsn(), - last_record_lsn: metadata.disk_consistent_lsn(), - prev_record_lsn: metadata.prev_record_lsn(), - latest_gc_cutoff_lsn: metadata.latest_gc_cutoff_lsn(), - timeline_state: LocalTimelineState::Unloaded, - current_logical_size: None, - current_physical_size: None, - current_logical_size_non_incremental: None, - current_physical_size_non_incremental: None, - wal_source_connstr: None, - last_received_msg_lsn: None, - last_received_msg_ts: None, - } -} - -fn local_timeline_info_from_repo_timeline( - repo_timeline: &RepositoryTimeline, - include_non_incremental_logical_size: bool, - include_non_incremental_physical_size: bool, -) -> anyhow::Result { - match repo_timeline { - RepositoryTimeline::Loaded(timeline) => local_timeline_info_from_loaded_timeline( - timeline, - include_non_incremental_logical_size, - include_non_incremental_physical_size, - ), - RepositoryTimeline::Unloaded { metadata } => { - Ok(local_timeline_info_from_unloaded_timeline(metadata)) - } - } -} - fn list_local_timelines( tenant_id: ZTenantId, include_non_incremental_logical_size: bool, include_non_incremental_physical_size: bool, ) -> Result> { let repo = tenant_mgr::get_repository_for_tenant(tenant_id) - .with_context(|| format!("Failed to get repo for tenant {}", tenant_id))?; + .with_context(|| format!("Failed to get repo for tenant {tenant_id}"))?; let repo_timelines = repo.list_timelines(); let mut local_timeline_info = Vec::with_capacity(repo_timelines.len()); for (timeline_id, repository_timeline) in repo_timelines { local_timeline_info.push(( timeline_id, - local_timeline_info_from_repo_timeline( + local_timeline_info_from_timeline( &repository_timeline, include_non_incremental_logical_size, include_non_incremental_physical_size, @@ -214,12 +171,12 @@ async fn timeline_create_handler(mut request: Request) -> Result { + Ok(Some(new_timeline)) => { // Created. Construct a TimelineInfo for it. - let local_info = local_timeline_info_from_loaded_timeline(&new_timeline, false, false)?; + let local_info = local_timeline_info_from_timeline(&new_timeline, false, false)?; Ok(Some(TimelineInfo { tenant_id, - timeline_id: new_timeline_id, + timeline_id: new_timeline.timeline_id, local: Some(local_info), remote: None, })) @@ -311,7 +268,7 @@ async fn timeline_detail_handler(request: Request) -> Result>, tenant_id: ZTenantId, - timelines: Mutex>, + timelines: Mutex>>, // This mutex prevents creation of new timelines during GC. // Adding yet another mutex (in addition to `timelines`) is needed because holding // `timelines` mutex during all GC iteration (especially with enforced checkpoint) @@ -126,37 +128,18 @@ pub struct Repository { impl Repository { /// Get Timeline handle for given zenith timeline ID. /// This function is idempotent. It doesn't change internal state in any way. - pub fn get_timeline(&self, timelineid: ZTimelineId) -> Option> { - self.timelines - .lock() - .unwrap() - .get(&timelineid) - .cloned() - .map(RepositoryTimeline::from) - } - - /// Get Timeline handle for locally available timeline. Load it into memory if it is not loaded. - pub fn get_timeline_load(&self, timeline_id: ZTimelineId) -> Result> { - let mut timelines = self.timelines.lock().unwrap(); - match self.get_timeline_load_internal(timeline_id, &mut timelines)? { - Some(local_loaded_timeline) => Ok(local_loaded_timeline), - None => anyhow::bail!("cannot get local timeline, unknown timeline id: {timeline_id}"), - } + pub fn get_timeline(&self, timeline_id: ZTimelineId) -> Option> { + self.timelines.lock().unwrap().get(&timeline_id).cloned() } /// Lists timelines the repository contains. /// Up to repository's implementation to omit certain timelines that ar not considered ready for use. - pub fn list_timelines(&self) -> Vec<(ZTimelineId, RepositoryTimeline)> { + pub fn list_timelines(&self) -> Vec<(ZTimelineId, Arc)> { self.timelines .lock() .unwrap() .iter() - .map(|(timeline_id, timeline_entry)| { - ( - *timeline_id, - RepositoryTimeline::from(timeline_entry.clone()), - ) - }) + .map(|(timeline_id, timeline_entry)| (*timeline_id, Arc::clone(timeline_entry))) .collect() } @@ -164,16 +147,18 @@ impl Repository { /// Initdb lsn is provided for timeline impl to be able to perform checks for some operations against it. pub fn create_empty_timeline( &self, - timeline_id: ZTimelineId, + new_timeline_id: ZTimelineId, initdb_lsn: Lsn, ) -> Result> { + // XXX: keep the lock to avoid races during timeline creation let mut timelines = self.timelines.lock().unwrap(); - let vacant_timeline_entry = match timelines.entry(timeline_id) { - Entry::Occupied(_) => bail!("Timeline already exists"), - Entry::Vacant(vacant_entry) => vacant_entry, - }; - let timeline_path = self.conf.timeline_path(&timeline_id, &self.tenant_id); + anyhow::ensure!( + timelines.get(&new_timeline_id).is_none(), + "Timeline {new_timeline_id} already exists" + ); + + let timeline_path = self.conf.timeline_path(&new_timeline_id, &self.tenant_id); if timeline_path.exists() { bail!("Timeline directory already exists, but timeline is missing in repository map. This is a bug.") } @@ -181,31 +166,25 @@ impl Repository { // Create the timeline directory, and write initial metadata to file. crashsafe_dir::create_dir_all(timeline_path)?; - let metadata = TimelineMetadata::new(Lsn(0), None, None, Lsn(0), initdb_lsn, initdb_lsn); - save_metadata(self.conf, timeline_id, self.tenant_id, &metadata, true)?; - - let timeline = Timeline::new( + let new_metadata = + TimelineMetadata::new(Lsn(0), None, None, Lsn(0), initdb_lsn, initdb_lsn); + save_metadata( self.conf, - Arc::clone(&self.tenant_conf), - metadata, - None, - timeline_id, + new_timeline_id, self.tenant_id, - Arc::clone(&self.walredo_mgr), - self.upload_layers, - ); - timeline.layers.write().unwrap().next_open_layer_at = Some(initdb_lsn); + &new_metadata, + true, + )?; - // Insert if not exists - let timeline = Arc::new(timeline); - vacant_timeline_entry.insert(LayeredTimelineEntry::Loaded(Arc::clone(&timeline))); + let new_timeline = + self.initialize_new_timeline(new_timeline_id, new_metadata, &mut timelines)?; + new_timeline.layers.write().unwrap().next_open_layer_at = Some(initdb_lsn); - crate::tenant_mgr::try_send_timeline_update(LocalTimelineUpdate::Attach { - id: ZTenantTimelineId::new(self.tenant_id(), timeline_id), - timeline: Arc::clone(&timeline), - }); + if let hash_map::Entry::Vacant(v) = timelines.entry(new_timeline_id) { + v.insert(Arc::clone(&new_timeline)); + } - Ok(timeline) + Ok(new_timeline) } /// Branch a timeline @@ -214,7 +193,7 @@ impl Repository { src: ZTimelineId, dst: ZTimelineId, start_lsn: Option, - ) -> Result<()> { + ) -> Result> { // We need to hold this lock to prevent GC from starting at the same time. GC scans the directory to learn // about timelines, so otherwise a race condition is possible, where we create new timeline and GC // concurrently removes data that is needed by the new timeline. @@ -229,12 +208,12 @@ impl Repository { // Step 2 is to avoid initializing the new branch using data removed by past GC iterations // or in-queue GC iterations. + // XXX: keep the lock to avoid races during timeline creation let mut timelines = self.timelines.lock().unwrap(); - let src_timeline = self - .get_timeline_load_internal(src, &mut timelines) + let src_timeline = timelines + .get(&src) // message about timeline being remote is one .context up in the stack - .context("failed to load timeline for branching")? - .ok_or_else(|| anyhow::anyhow!("unknown timeline id: {}", &src))?; + .ok_or_else(|| anyhow::anyhow!("unknown timeline id: {src}"))?; let latest_gc_cutoff_lsn = src_timeline.get_latest_gc_cutoff_lsn(); @@ -252,7 +231,7 @@ impl Repository { .check_lsn_is_in_scope(start_lsn, &latest_gc_cutoff_lsn) .context(format!( "invalid branch start lsn: less than latest GC cutoff {}", - *latest_gc_cutoff_lsn + *latest_gc_cutoff_lsn, ))?; { let gc_info = src_timeline.gc_info.read().unwrap(); @@ -293,11 +272,13 @@ impl Repository { ); crashsafe_dir::create_dir_all(self.conf.timeline_path(&dst, &self.tenant_id))?; save_metadata(self.conf, dst, self.tenant_id, &metadata, true)?; - timelines.insert(dst, LayeredTimelineEntry::Unloaded { id: dst, metadata }); - info!("branched timeline {} from {} at {}", dst, src, start_lsn); + let new_timeline = self.initialize_new_timeline(dst, metadata, &mut timelines)?; + timelines.insert(dst, Arc::clone(&new_timeline)); - Ok(()) + info!("branched timeline {dst} from {src} at {start_lsn}"); + + Ok(new_timeline) } /// perform one garbage collection iteration, removing old data files from disk. @@ -346,14 +327,7 @@ impl Repository { for (timelineid, timeline) in &timelines_to_compact { let _entered = info_span!("compact", timeline = %timelineid, tenant = %self.tenant_id).entered(); - match timeline { - LayeredTimelineEntry::Loaded(timeline) => { - timeline.compact()?; - } - LayeredTimelineEntry::Unloaded { .. } => { - debug!("Cannot compact remote timeline {}", timelineid) - } - } + timeline.compact()?; } Ok(()) @@ -371,15 +345,7 @@ impl Repository { let timelines = self.timelines.lock().unwrap(); let timelines_to_compact = timelines .iter() - // filter to get only loaded timelines - .filter_map(|(timelineid, entry)| match entry { - LayeredTimelineEntry::Loaded(timeline) => Some((timelineid, timeline)), - LayeredTimelineEntry::Unloaded { .. } => { - debug!("Skipping checkpoint for unloaded timeline {}", timelineid); - None - } - }) - .map(|(timelineid, timeline)| (*timelineid, timeline.clone())) + .map(|(timelineid, timeline)| (*timelineid, Arc::clone(timeline))) .collect::>(); drop(timelines); @@ -403,7 +369,7 @@ impl Repository { // because detach removes files, which will break child branches let children_exist = timelines .iter() - .any(|(_, entry)| entry.ancestor_timeline_id() == Some(timeline_id)); + .any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id)); ensure!( !children_exist, @@ -431,19 +397,36 @@ impl Repository { Ok(()) } - /// Updates timeline based on the `TimelineSyncStatusUpdate`, received from the remote storage synchronization. - /// See [`crate::remote_storage`] for more details about the synchronization. - pub fn attach_timeline(&self, timeline_id: ZTimelineId) -> Result<()> { - debug!("attach timeline_id: {}", timeline_id,); - match self.timelines.lock().unwrap().entry(timeline_id) { - Entry::Occupied(_) => bail!("We completed a download for a timeline that already exists in repository. This is a bug."), - Entry::Vacant(entry) => { - // we need to get metadata of a timeline, another option is to pass it along with Downloaded status - let metadata = load_metadata(self.conf, timeline_id, self.tenant_id).context("failed to load local metadata")?; - // finally we make newly downloaded timeline visible to repository - entry.insert(LayeredTimelineEntry::Unloaded { id: timeline_id, metadata }) - }, + pub fn init_attach_timelines( + &self, + timelines: Vec<(ZTimelineId, TimelineMetadata)>, + ) -> anyhow::Result<()> { + let sorted_timelines = if timelines.len() == 1 { + timelines + } else if !timelines.is_empty() { + tree_sort_timelines(timelines)? + } else { + warn!("No timelines to attach received"); + return Ok(()); }; + + let mut timelines_accessor = self.timelines.lock().unwrap(); + for (timeline_id, metadata) in sorted_timelines { + let timeline = self + .initialize_new_timeline(timeline_id, metadata, &mut timelines_accessor) + .with_context(|| format!("Failed to initialize timeline {timeline_id}"))?; + + match timelines_accessor.entry(timeline.timeline_id) { + hash_map::Entry::Occupied(_) => anyhow::bail!( + "Found freshly initialized timeline {} in the tenant map", + timeline.timeline_id + ), + hash_map::Entry::Vacant(v) => { + v.insert(timeline); + } + } + } + Ok(()) } @@ -453,6 +436,49 @@ impl Repository { } } +/// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id), +/// perform a topological sort, so that the parent of each timeline comes +/// before the children. +fn tree_sort_timelines( + timelines: Vec<(ZTimelineId, TimelineMetadata)>, +) -> Result> { + let mut result = Vec::with_capacity(timelines.len()); + + let mut now = Vec::with_capacity(timelines.len()); + // (ancestor, children) + let mut later: HashMap> = + HashMap::with_capacity(timelines.len()); + + for (timeline_id, metadata) in timelines { + if let Some(ancestor_id) = metadata.ancestor_timeline() { + let children = later.entry(ancestor_id).or_default(); + children.push((timeline_id, metadata)); + } else { + now.push((timeline_id, metadata)); + } + } + + while let Some((timeline_id, metadata)) = now.pop() { + result.push((timeline_id, metadata)); + // All children of this can be loaded now + if let Some(mut children) = later.remove(&timeline_id) { + now.append(&mut children); + } + } + + // All timelines should be visited now. Unless there were timelines with missing ancestors. + if !later.is_empty() { + for (missing_id, orphan_ids) in later { + for (orphan_id, _) in orphan_ids { + error!("could not load timeline {orphan_id} because its ancestor timeline {missing_id} could not be loaded"); + } + } + bail!("could not load tenant because some timelines are missing ancestors"); + } + + Ok(result) +} + /// Private functions impl Repository { pub fn get_checkpoint_distance(&self) -> u64 { @@ -548,87 +574,49 @@ impl Repository { Ok(()) } - // Implementation of the public `get_timeline_load` function. - // Differences from the public: - // * interface in that the caller must already hold the mutex on the 'timelines' hashmap. - fn get_timeline_load_internal( + fn initialize_new_timeline( &self, - timeline_id: ZTimelineId, - timelines: &mut HashMap, - ) -> anyhow::Result>> { - Ok(match timelines.get(&timeline_id) { - Some(entry) => match entry { - LayeredTimelineEntry::Loaded(local_timeline) => { - debug!("timeline {timeline_id} found loaded into memory"); - Some(Arc::clone(local_timeline)) - } - LayeredTimelineEntry::Unloaded { .. } => { - debug!( - "timeline {timeline_id} found on a local disk, but not loaded into the memory, loading" - ); - let timeline = self.load_local_timeline(timeline_id, timelines)?; - let was_loaded = timelines.insert( - timeline_id, - LayeredTimelineEntry::Loaded(Arc::clone(&timeline)), - ); - ensure!( - was_loaded.is_none() - || matches!(was_loaded, Some(LayeredTimelineEntry::Unloaded { .. })), - "assertion failure, inserted wrong timeline in an incorrect state" - ); - Some(timeline) - } - }, - None => { - debug!("timeline {timeline_id} not found"); - None - } - }) - } - - fn load_local_timeline( - &self, - timeline_id: ZTimelineId, - timelines: &mut HashMap, + new_timeline_id: ZTimelineId, + new_metadata: TimelineMetadata, + timelines: &mut MutexGuard>>, ) -> anyhow::Result> { - let metadata = load_metadata(self.conf, timeline_id, self.tenant_id) - .context("failed to load metadata")?; - let disk_consistent_lsn = metadata.disk_consistent_lsn(); + let ancestor = match new_metadata.ancestor_timeline() { + Some(ancestor_timeline_id) => Some( + timelines + .get(&ancestor_timeline_id) + .cloned() + .with_context(|| { + format!( + "Timeline's {new_timeline_id} ancestor {ancestor_timeline_id} was not found" + ) + })?, + ), + None => None, + }; - let ancestor = metadata - .ancestor_timeline() - .map(|ancestor_timeline_id| { - trace!("loading {timeline_id}'s ancestor {}", &ancestor_timeline_id); - self.get_timeline_load_internal(ancestor_timeline_id, timelines) - }) - .transpose() - .context("cannot load ancestor timeline")? - .flatten() - .map(LayeredTimelineEntry::Loaded); - let _enter = info_span!("loading local timeline").entered(); + let new_disk_consistent_lsn = new_metadata.disk_consistent_lsn(); - let timeline = Timeline::new( + let new_timeline = Arc::new(Timeline::new( self.conf, Arc::clone(&self.tenant_conf), - metadata, + new_metadata, ancestor, - timeline_id, + new_timeline_id, self.tenant_id, Arc::clone(&self.walredo_mgr), self.upload_layers, - ); - timeline - .load_layer_map(disk_consistent_lsn) + )); + + new_timeline + .load_layer_map(new_disk_consistent_lsn) .context("failed to load layermap")?; - let timeline = Arc::new(timeline); - crate::tenant_mgr::try_send_timeline_update(LocalTimelineUpdate::Attach { - id: ZTenantTimelineId::new(self.tenant_id(), timeline_id), - timeline: Arc::clone(&timeline), + id: ZTenantTimelineId::new(self.tenant_id(), new_timeline_id), + timeline: Arc::clone(&new_timeline), }); - Ok(timeline) + Ok(new_timeline) } pub fn new( @@ -775,18 +763,20 @@ impl Repository { // This is unresolved question for now, how to do gc in presence of remote timelines // especially when this is combined with branching. // Somewhat related: https://github.com/zenithdb/zenith/issues/999 - if let Some(ancestor_timeline_id) = &timeline_entry.ancestor_timeline_id() { + if let Some(ancestor_timeline_id) = &timeline_entry.get_ancestor_timeline_id() { // If target_timeline is specified, we only need to know branchpoints of its children if let Some(timelineid) = target_timeline_id { if ancestor_timeline_id == &timelineid { - all_branchpoints - .insert((*ancestor_timeline_id, timeline_entry.ancestor_lsn())); + all_branchpoints.insert(( + *ancestor_timeline_id, + timeline_entry.get_ancestor_lsn(), + )); } } // Collect branchpoints for all timelines else { all_branchpoints - .insert((*ancestor_timeline_id, timeline_entry.ancestor_lsn())); + .insert((*ancestor_timeline_id, timeline_entry.get_ancestor_lsn())); } } @@ -801,7 +791,9 @@ impl Repository { let mut gc_timelines = Vec::with_capacity(timeline_ids.len()); for timeline_id in timeline_ids { // Timeline is known to be local and loaded. - let timeline = self.get_timeline_load(timeline_id)?; + let timeline = self + .get_timeline(timeline_id) + .with_context(|| format!("Timeline {timeline_id} was not found"))?; // If target_timeline is specified, ignore all other timelines if let Some(target_timelineid) = target_timeline_id { @@ -1031,20 +1023,21 @@ pub mod repo_harness { false, ); // populate repo with locally available timelines + let mut timelines_to_load = Vec::new(); for timeline_dir_entry in fs::read_dir(self.conf.timelines_path(&self.tenant_id)) .expect("should be able to read timelines dir") { - let timeline_dir_entry = timeline_dir_entry.unwrap(); + let timeline_dir_entry = timeline_dir_entry?; let timeline_id: ZTimelineId = timeline_dir_entry .path() .file_name() .unwrap() .to_string_lossy() - .parse() - .unwrap(); - - repo.attach_timeline(timeline_id)?; + .parse()?; + let timeline_metadata = load_metadata(self.conf, timeline_id, self.tenant_id)?; + timelines_to_load.push((timeline_id, timeline_metadata)); } + repo.init_attach_timelines(timelines_to_load)?; Ok(repo) } @@ -1127,7 +1120,10 @@ mod tests { match repo.create_empty_timeline(TIMELINE_ID, Lsn(0)) { Ok(_) => panic!("duplicate timeline creation should fail"), - Err(e) => assert_eq!(e.to_string(), "Timeline already exists"), + Err(e) => assert_eq!( + e.to_string(), + format!("Timeline {TIMELINE_ID} already exists") + ), } Ok(()) @@ -1170,7 +1166,7 @@ mod tests { // Branch the history, modify relation differently on the new timeline repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x30)))?; let newtline = repo - .get_timeline_load(NEW_TIMELINE_ID) + .get_timeline(NEW_TIMELINE_ID) .expect("Should have a local timeline"); let new_writer = newtline.writer(); new_writer.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))?; @@ -1318,7 +1314,7 @@ mod tests { repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; let newtline = repo - .get_timeline_load(NEW_TIMELINE_ID) + .get_timeline(NEW_TIMELINE_ID) .expect("Should have a local timeline"); // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50 repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?; @@ -1334,7 +1330,7 @@ mod tests { repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; let newtline = repo - .get_timeline_load(NEW_TIMELINE_ID) + .get_timeline(NEW_TIMELINE_ID) .expect("Should have a local timeline"); make_some_layers(newtline.as_ref(), Lsn(0x60))?; @@ -1363,17 +1359,8 @@ mod tests { } let repo = harness.load(); - let tline = repo - .get_timeline(TIMELINE_ID) + repo.get_timeline(TIMELINE_ID) .expect("cannot load timeline"); - assert!(matches!(tline, RepositoryTimeline::Unloaded { .. })); - - assert!(repo.get_timeline_load(TIMELINE_ID).is_ok()); - - let tline = repo - .get_timeline(TIMELINE_ID) - .expect("cannot load timeline"); - assert!(matches!(tline, RepositoryTimeline::Loaded(_))); Ok(()) } @@ -1393,7 +1380,7 @@ mod tests { repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; let newtline = repo - .get_timeline_load(NEW_TIMELINE_ID) + .get_timeline(NEW_TIMELINE_ID) .expect("Should have a local timeline"); make_some_layers(newtline.as_ref(), Lsn(0x60))?; @@ -1402,28 +1389,15 @@ mod tests { // check that both of them are initially unloaded let repo = harness.load(); - { - let tline = repo.get_timeline(TIMELINE_ID).expect("cannot get timeline"); - assert!(matches!(tline, RepositoryTimeline::Unloaded { .. })); - - let tline = repo - .get_timeline(NEW_TIMELINE_ID) - .expect("cannot get timeline"); - assert!(matches!(tline, RepositoryTimeline::Unloaded { .. })); - } - // load only child timeline - let _ = repo - .get_timeline_load(NEW_TIMELINE_ID) - .expect("cannot load timeline"); // check that both, child and ancestor are loaded - let tline = repo + let _child_tline = repo .get_timeline(NEW_TIMELINE_ID) - .expect("cannot get timeline"); - assert!(matches!(tline, RepositoryTimeline::Loaded(_))); + .expect("cannot get child timeline loaded"); - let tline = repo.get_timeline(TIMELINE_ID).expect("cannot get timeline"); - assert!(matches!(tline, RepositoryTimeline::Loaded(_))); + let _ancestor_tline = repo + .get_timeline(TIMELINE_ID) + .expect("cannot get ancestor timeline loaded"); Ok(()) } @@ -1447,7 +1421,9 @@ mod tests { std::fs::write(metadata_path, metadata_bytes)?; let err = harness.try_load().err().expect("should fail"); - assert_eq!(err.to_string(), "failed to load local metadata"); + assert!(err + .to_string() + .starts_with("Failed to parse metadata bytes from path")); let mut found_error_message = false; let mut err_source = err.source(); @@ -1663,7 +1639,9 @@ mod tests { for _ in 0..50 { let new_tline_id = ZTimelineId::generate(); repo.branch_timeline(tline_id, new_tline_id, Some(lsn))?; - tline = repo.get_timeline_load(new_tline_id)?; + tline = repo + .get_timeline(new_tline_id) + .expect("Should have the branched timeline"); tline_id = new_tline_id; for _ in 0..NUM_KEYS { @@ -1722,7 +1700,9 @@ mod tests { for idx in 0..NUM_TLINES { let new_tline_id = ZTimelineId::generate(); repo.branch_timeline(tline_id, new_tline_id, Some(lsn))?; - tline = repo.get_timeline_load(new_tline_id)?; + tline = repo + .get_timeline(new_tline_id) + .expect("Should have the branched timeline"); tline_id = new_tline_id; for _ in 0..NUM_KEYS { @@ -1749,11 +1729,11 @@ mod tests { if lsn.0 == 0 { continue; } - println!("chekcking [{}][{}] at {}", idx, blknum, lsn); + println!("checking [{idx}][{blknum}] at {lsn}"); test_key.field6 = blknum as u32; assert_eq!( tline.get(test_key, *lsn)?, - TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn)) + TEST_IMG(&format!("{idx} {blknum} at {lsn}")) ); } } diff --git a/pageserver/src/layered_repository/timeline.rs b/pageserver/src/layered_repository/timeline.rs index fd719812a3..821995fad1 100644 --- a/pageserver/src/layered_repository/timeline.rs +++ b/pageserver/src/layered_repository/timeline.rs @@ -50,7 +50,7 @@ use utils::{ zid::{ZTenantId, ZTimelineId}, }; -use crate::repository::{GcResult, RepositoryTimeline}; +use crate::repository::GcResult; use crate::repository::{Key, Value}; use crate::thread_mgr; use crate::walreceiver::IS_WAL_RECEIVER; @@ -164,72 +164,6 @@ static PERSISTENT_BYTES_WRITTEN: Lazy = Lazy::new(|| { .expect("failed to define a metric") }); -#[derive(Clone)] -pub enum LayeredTimelineEntry { - Loaded(Arc), - Unloaded { - id: ZTimelineId, - metadata: TimelineMetadata, - }, -} - -impl LayeredTimelineEntry { - fn timeline_id(&self) -> ZTimelineId { - match self { - LayeredTimelineEntry::Loaded(timeline) => timeline.timeline_id, - LayeredTimelineEntry::Unloaded { id, .. } => *id, - } - } - - pub fn ancestor_timeline_id(&self) -> Option { - match self { - LayeredTimelineEntry::Loaded(timeline) => { - timeline.ancestor_timeline.as_ref().map(|t| t.timeline_id()) - } - LayeredTimelineEntry::Unloaded { metadata, .. } => metadata.ancestor_timeline(), - } - } - - pub fn ancestor_lsn(&self) -> Lsn { - match self { - LayeredTimelineEntry::Loaded(timeline) => timeline.ancestor_lsn, - LayeredTimelineEntry::Unloaded { metadata, .. } => metadata.ancestor_lsn(), - } - } - - fn ensure_loaded(&self) -> anyhow::Result<&Arc> { - match self { - LayeredTimelineEntry::Loaded(timeline) => Ok(timeline), - LayeredTimelineEntry::Unloaded { .. } => { - anyhow::bail!("timeline is unloaded") - } - } - } - - pub fn layer_removal_guard(&self) -> Result>, anyhow::Error> { - match self { - LayeredTimelineEntry::Loaded(timeline) => timeline - .layer_removal_cs - .try_lock() - .map_err(|e| anyhow::anyhow!("cannot lock compaction critical section {e}")) - .map(Some), - - LayeredTimelineEntry::Unloaded { .. } => Ok(None), - } - } -} - -impl From for RepositoryTimeline { - fn from(entry: LayeredTimelineEntry) -> Self { - match entry { - LayeredTimelineEntry::Loaded(timeline) => RepositoryTimeline::Loaded(timeline as _), - LayeredTimelineEntry::Unloaded { metadata, .. } => { - RepositoryTimeline::Unloaded { metadata } - } - } - } -} - struct TimelineMetrics { pub reconstruct_time_histo: Histogram, pub materialized_page_cache_hit_counter: GenericCounter, @@ -342,7 +276,7 @@ pub struct Timeline { // Parent timeline that this timeline was branched from, and the LSN // of the branch point. - ancestor_timeline: Option, + ancestor_timeline: Option>, ancestor_lsn: Lsn, // Metrics @@ -566,7 +500,7 @@ impl Timeline { pub fn get_ancestor_timeline_id(&self) -> Option { self.ancestor_timeline .as_ref() - .map(LayeredTimelineEntry::timeline_id) + .map(|ancestor| ancestor.timeline_id) } /// Lock and get timeline's GC cuttof @@ -781,7 +715,7 @@ impl Timeline { conf: &'static PageServerConf, tenant_conf: Arc>, metadata: TimelineMetadata, - ancestor: Option, + ancestor: Option>, timeline_id: ZTimelineId, tenant_id: ZTenantId, walredo_mgr: Arc, @@ -938,6 +872,12 @@ impl Timeline { Ok(()) } + pub fn layer_removal_guard(&self) -> Result, anyhow::Error> { + self.layer_removal_cs + .try_lock() + .map_err(|e| anyhow::anyhow!("cannot lock compaction critical section {e}")) + } + /// Retrieve current logical size of the timeline. /// /// The size could be lagging behind the actual number, in case @@ -1204,24 +1144,13 @@ impl Timeline { } fn get_ancestor_timeline(&self) -> Result> { - let ancestor = self - .ancestor_timeline - .as_ref() - .with_context(|| { - format!( - "Ancestor is missing. Timeline id: {} Ancestor id {:?}", - self.timeline_id, - self.get_ancestor_timeline_id(), - ) - })? - .ensure_loaded() - .with_context(|| { - format!( - "Ancestor timeline is not loaded. Timeline id: {} Ancestor id {:?}", - self.timeline_id, - self.get_ancestor_timeline_id(), - ) - })?; + let ancestor = self.ancestor_timeline.as_ref().with_context(|| { + format!( + "Ancestor is missing. Timeline id: {} Ancestor id {:?}", + self.timeline_id, + self.get_ancestor_timeline_id(), + ) + })?; Ok(Arc::clone(ancestor)) } @@ -1251,7 +1180,9 @@ impl Timeline { layer = Arc::clone(open_layer); } else { // No writeable layer yet. Create one. - let start_lsn = layers.next_open_layer_at.unwrap(); + let start_lsn = layers + .next_open_layer_at + .context("No next open layer found")?; trace!( "creating layer for write at {}/{} for record at {}", @@ -1496,7 +1427,7 @@ impl Timeline { let ancestor_timelineid = self .ancestor_timeline .as_ref() - .map(LayeredTimelineEntry::timeline_id); + .map(|ancestor| ancestor.timeline_id); let metadata = TimelineMetadata::new( disk_consistent_lsn, diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index d59a82d488..7f7fa3c22b 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -457,18 +457,18 @@ impl PageServerHandler { fn handle_pagerequests( &self, pgb: &mut PostgresBackend, - timelineid: ZTimelineId, - tenantid: ZTenantId, + timeline_id: ZTimelineId, + tenant_id: ZTenantId, ) -> anyhow::Result<()> { - let _enter = info_span!("pagestream", timeline = %timelineid, tenant = %tenantid).entered(); + let _enter = + info_span!("pagestream", timeline = %timeline_id, tenant = %tenant_id).entered(); // NOTE: pagerequests handler exits when connection is closed, // so there is no need to reset the association - thread_mgr::associate_with(Some(tenantid), Some(timelineid)); + thread_mgr::associate_with(Some(tenant_id), Some(timeline_id)); // Check that the timeline exists - let timeline = tenant_mgr::get_local_timeline_with_load(tenantid, timelineid) - .context("Cannot load local timeline")?; + let timeline = get_local_timeline(tenant_id, timeline_id)?; /* switch client to COPYBOTH */ pgb.write_message(&BeMessage::CopyBothResponse)?; @@ -488,8 +488,8 @@ impl PageServerHandler { }; let zenith_fe_msg = PagestreamFeMessage::parse(copy_data_bytes)?; - let tenant_id = tenantid.to_string(); - let timeline_id = timelineid.to_string(); + let tenant_id = tenant_id.to_string(); + let timeline_id = timeline_id.to_string(); let response = match zenith_fe_msg { PagestreamFeMessage::Exists(req) => SMGR_QUERY_TIME @@ -599,7 +599,9 @@ impl PageServerHandler { info_span!("import wal", timeline = %timeline_id, tenant = %tenant_id).entered(); let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?; - let timeline = repo.get_timeline_load(timeline_id)?; + let timeline = repo + .get_timeline(timeline_id) + .with_context(|| format!("Timeline {timeline_id} was not found"))?; ensure!(timeline.get_last_record_lsn() == start_lsn); // TODO leave clean state on error. For now you can use detach to clean @@ -762,19 +764,18 @@ impl PageServerHandler { fn handle_basebackup_request( &self, pgb: &mut PostgresBackend, - timelineid: ZTimelineId, + timeline_id: ZTimelineId, lsn: Option, prev_lsn: Option, - tenantid: ZTenantId, + tenant_id: ZTenantId, full_backup: bool, ) -> anyhow::Result<()> { - let span = info_span!("basebackup", timeline = %timelineid, tenant = %tenantid, lsn = field::Empty); + let span = info_span!("basebackup", timeline = %timeline_id, tenant = %tenant_id, lsn = field::Empty); let _enter = span.enter(); info!("starting"); // check that the timeline exists - let timeline = tenant_mgr::get_local_timeline_with_load(tenantid, timelineid) - .context("Cannot load local timeline")?; + let timeline = get_local_timeline(tenant_id, timeline_id)?; let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); if let Some(lsn) = lsn { timeline @@ -906,12 +907,11 @@ impl postgres_backend::Handler for PageServerHandler { "invalid param number for get_last_record_rlsn command" ); - let tenantid = ZTenantId::from_str(params[0])?; - let timelineid = ZTimelineId::from_str(params[1])?; + let tenant_id = ZTenantId::from_str(params[0])?; + let timeline_id = ZTimelineId::from_str(params[1])?; - self.check_permission(Some(tenantid))?; - let timeline = tenant_mgr::get_local_timeline_with_load(tenantid, timelineid) - .context("Cannot load local timeline")?; + self.check_permission(Some(tenant_id))?; + let timeline = get_local_timeline(tenant_id, timeline_id)?; let end_of_timeline = timeline.get_last_record_rlsn(); @@ -1134,10 +1134,9 @@ impl postgres_backend::Handler for PageServerHandler { .captures(query_string) .with_context(|| format!("Invalid compact: '{}'", query_string))?; - let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; - let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; - let timeline = tenant_mgr::get_local_timeline_with_load(tenantid, timelineid) - .context("Couldn't load timeline")?; + let tenant_id = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; + let timeline_id = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; + let timeline = get_local_timeline(tenant_id, timeline_id)?; timeline.compact()?; pgb.write_message_noflush(&SINGLE_COL_ROWDESC)? @@ -1152,11 +1151,9 @@ impl postgres_backend::Handler for PageServerHandler { .captures(query_string) .with_context(|| format!("invalid checkpoint command: '{}'", query_string))?; - let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; - let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; - - let timeline = tenant_mgr::get_local_timeline_with_load(tenantid, timelineid) - .context("Cannot load local timeline")?; + let tenant_id = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; + let timeline_id = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; + let timeline = get_local_timeline(tenant_id, timeline_id)?; // Checkpoint the timeline and also compact it (due to `CheckpointConfig::Forced`). timeline.checkpoint(CheckpointConfig::Forced)?; @@ -1172,10 +1169,9 @@ impl postgres_backend::Handler for PageServerHandler { .captures(query_string) .with_context(|| format!("invalid get_lsn_by_timestamp: '{}'", query_string))?; - let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; - let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; - let timeline = tenant_mgr::get_local_timeline_with_load(tenantid, timelineid) - .context("Cannot load local timeline")?; + let tenant_id = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; + let timeline_id = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; + let timeline = get_local_timeline(tenant_id, timeline_id)?; let timestamp = humantime::parse_rfc3339(caps.get(3).unwrap().as_str())?; let timestamp_pg = to_pg_timestamp(timestamp); @@ -1201,6 +1197,15 @@ impl postgres_backend::Handler for PageServerHandler { } } +fn get_local_timeline(tenant_id: ZTenantId, timeline_id: ZTimelineId) -> Result> { + tenant_mgr::get_repository_for_tenant(tenant_id) + .and_then(|repo| { + repo.get_timeline(timeline_id) + .context("No timeline in tenant's repository") + }) + .with_context(|| format!("Could not get timeline {timeline_id} in tenant {tenant_id}")) +} + /// /// A std::io::Write implementation that wraps all data written to it in CopyData /// messages. diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index e46a39436d..c3b08c93de 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -1,4 +1,3 @@ -use crate::layered_repository::metadata::TimelineMetadata; use crate::walrecord::ZenithWalRecord; use anyhow::{bail, Result}; use byteorder::{ByteOrder, BE}; @@ -6,7 +5,6 @@ use bytes::Bytes; use serde::{Deserialize, Serialize}; use std::fmt; use std::ops::{AddAssign, Range}; -use std::sync::Arc; use std::time::Duration; #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)] @@ -175,30 +173,6 @@ impl Value { } } -/// A timeline, that belongs to the current repository. -pub enum RepositoryTimeline { - /// Timeline, with its files present locally in pageserver's working directory. - /// Loaded into pageserver's memory and ready to be used. - Loaded(Arc), - - /// All the data is available locally, but not loaded into memory, so loading have to be done before actually using the timeline - Unloaded { - // It is ok to keep metadata here, because it is not changed when timeline is unloaded. - // FIXME can s3 sync actually change it? It can change it when timeline is in awaiting download state. - // but we currently do not download something for the timeline once it is local (even if there are new checkpoints) is it correct? - // also it is not that good to keep TimelineMetadata here, because it is layered repo implementation detail - metadata: TimelineMetadata, - }, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] -pub enum LocalTimelineState { - // timeline is loaded into memory (with layer map and all the bits), - Loaded, - // timeline is on disk locally and ready to be loaded into memory. - Unloaded, -} - /// /// Result of performing GC /// diff --git a/pageserver/src/storage_sync.rs b/pageserver/src/storage_sync.rs index a52cde7286..0bdc30a73f 100644 --- a/pageserver/src/storage_sync.rs +++ b/pageserver/src/storage_sync.rs @@ -903,8 +903,10 @@ fn storage_sync_loop( "Sync loop step completed, {} new tenant state update(s)", updated_tenants.len() ); - let mut sync_status_updates: HashMap> = - HashMap::new(); + let mut timelines_to_attach: HashMap< + ZTenantId, + Vec<(ZTimelineId, TimelineMetadata)>, + > = HashMap::new(); let index_accessor = runtime.block_on(index.read()); for tenant_id in updated_tenants { let tenant_entry = match index_accessor.tenant_entry(&tenant_id) { @@ -930,13 +932,18 @@ fn storage_sync_loop( // and register them all at once in a repository for download // to be submitted in a single operation to repository // so it can apply them at once to internal timeline map. - sync_status_updates - .insert(tenant_id, tenant_entry.keys().copied().collect()); + timelines_to_attach.insert( + tenant_id, + tenant_entry + .iter() + .map(|(&id, entry)| (id, entry.metadata.clone())) + .collect(), + ); } } drop(index_accessor); // Batch timeline download registration to ensure that the external registration code won't block any running tasks before. - attach_downloaded_tenants(conf, &index, sync_status_updates); + attach_downloaded_tenants(conf, &index, timelines_to_attach); } } ControlFlow::Break(()) => { diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index fec8a80b9b..cbf9f2094a 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -3,6 +3,7 @@ use crate::config::PageServerConf; use crate::http::models::TenantInfo; +use crate::layered_repository::metadata::TimelineMetadata; use crate::layered_repository::{load_metadata, Repository, Timeline}; use crate::storage_sync::index::{RemoteIndex, RemoteTimelineIndex}; use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData}; @@ -14,7 +15,7 @@ use anyhow::Context; use remote_storage::GenericRemoteStorage; use serde::{Deserialize, Serialize}; use std::collections::hash_map::Entry; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::fmt; use std::sync::Arc; use tokio::sync::mpsc; @@ -192,7 +193,7 @@ impl std::fmt::Debug for LocalTimelineUpdate { pub fn attach_downloaded_tenants( conf: &'static PageServerConf, remote_index: &RemoteIndex, - sync_status_updates: HashMap>, + sync_status_updates: HashMap>, ) { if sync_status_updates.is_empty() { debug!("No sync status updates to apply"); @@ -212,11 +213,9 @@ pub fn attach_downloaded_tenants( continue; } }; - match attach_downloaded_tenant(&repo, downloaded_timelines) { - Ok(()) => info!("successfully applied sync status updates for tenant {tenant_id}"), - Err(e) => error!( - "Failed to apply timeline sync timeline status updates for tenant {tenant_id}: {e:?}" - ), + match repo.init_attach_timelines(downloaded_timelines) { + Ok(()) => info!("successfully loaded local timelines for tenant {tenant_id}"), + Err(e) => error!("Failed to load local timelines for tenant {tenant_id}: {e:?}"), } } } @@ -371,15 +370,6 @@ pub fn get_repository_for_tenant(tenant_id: ZTenantId) -> anyhow::Result anyhow::Result> { - get_repository_for_tenant(tenant_id)?.get_timeline_load(timeline_id) -} - pub fn delete_timeline(tenant_id: ZTenantId, timeline_id: ZTimelineId) -> anyhow::Result<()> { // Start with the shutdown of timeline tasks (this shuts down the walreceiver) // It is important that we do not take locks here, and do not check whether the timeline exists @@ -499,7 +489,7 @@ fn check_broken_timeline( conf: &'static PageServerConf, tenant_id: ZTenantId, timeline_id: ZTimelineId, -) -> anyhow::Result<()> { +) -> anyhow::Result { let metadata = load_metadata(conf, timeline_id, tenant_id).context("failed to load metadata")?; @@ -509,7 +499,7 @@ fn check_broken_timeline( anyhow::bail!("Timeline {timeline_id} has a zero disk consistent LSN."); } - Ok(()) + Ok(metadata) } /// Note: all timelines are attached at once if and only if all of them are locally complete @@ -519,14 +509,14 @@ fn init_local_repository( local_timeline_init_statuses: HashMap, remote_index: &RemoteIndex, ) -> anyhow::Result<(), anyhow::Error> { - let mut timelines_to_attach = HashSet::new(); + let mut timelines_to_attach = Vec::new(); for (timeline_id, init_status) in local_timeline_init_statuses { match init_status { LocalTimelineInitStatus::LocallyComplete => { debug!("timeline {timeline_id} for tenant {tenant_id} is locally complete, registering it in repository"); - check_broken_timeline(conf, tenant_id, timeline_id) + let metadata = check_broken_timeline(conf, tenant_id, timeline_id) .context("found broken timeline")?; - timelines_to_attach.insert(timeline_id); + timelines_to_attach.push((timeline_id, metadata)); } LocalTimelineInitStatus::NeedsSync => { debug!( @@ -545,32 +535,8 @@ fn init_local_repository( // Lets fail here loudly to be on the safe side. // XXX: It may be a better api to actually distinguish between repository startup // and processing of newly downloaded timelines. - attach_downloaded_tenant(&repo, timelines_to_attach) - .with_context(|| format!("Failed to bootstrap timelines for tenant {tenant_id}"))?; - Ok(()) -} - -fn attach_downloaded_tenant( - repo: &Repository, - downloaded_timelines: HashSet, -) -> anyhow::Result<()> { - // first, register timeline metadata to ensure ancestors will be found later during layer load - for &timeline_id in &downloaded_timelines { - repo.attach_timeline(timeline_id).with_context(|| { - format!("Failed to load timeline {timeline_id} into in-memory repository") - })?; - } - - // and then load its layers in memory - for timeline_id in downloaded_timelines { - repo.get_timeline_load(timeline_id).with_context(|| { - format!( - "Failed to register add local timeline for tenant {}", - repo.tenant_id(), - ) - })?; - } - + repo.init_attach_timelines(timelines_to_attach) + .with_context(|| format!("Failed to init local timelines for tenant {tenant_id}"))?; Ok(()) } diff --git a/pageserver/src/timelines.rs b/pageserver/src/timelines.rs index 4f760751db..936699c2ec 100644 --- a/pageserver/src/timelines.rs +++ b/pageserver/src/timelines.rs @@ -108,7 +108,7 @@ fn bootstrap_timeline( tenantid: ZTenantId, tli: ZTimelineId, repo: &Repository, -) -> Result<()> { +) -> Result> { let initdb_path = conf .tenant_path(&tenantid) .join(format!("tmp-timeline-{}", tli)); @@ -141,7 +141,7 @@ fn bootstrap_timeline( // Remove temp dir. We don't need it anymore fs::remove_dir_all(pgdata_path)?; - Ok(()) + Ok(timeline) } /// @@ -159,7 +159,7 @@ pub(crate) fn create_timeline( new_timeline_id: Option, ancestor_timeline_id: Option, mut ancestor_start_lsn: Option, -) -> Result)>> { +) -> Result>> { let new_timeline_id = new_timeline_id.unwrap_or_else(ZTimelineId::generate); let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?; @@ -168,11 +168,11 @@ pub(crate) fn create_timeline( return Ok(None); } - match ancestor_timeline_id { + let loaded_timeline = match ancestor_timeline_id { Some(ancestor_timeline_id) => { let ancestor_timeline = repo - .get_timeline_load(ancestor_timeline_id) - .context("Cannot branch off the timeline that's not present locally")?; + .get_timeline(ancestor_timeline_id) + .context("Cannot branch off the timeline that's not present in pageserver")?; if let Some(lsn) = ancestor_start_lsn.as_mut() { // Wait for the WAL to arrive and be processed on the parent branch up @@ -201,8 +201,5 @@ pub(crate) fn create_timeline( None => bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref())?, }; - // load the timeline into memory - let loaded_timeline = tenant_mgr::get_local_timeline_with_load(tenant_id, new_timeline_id)?; - - Ok(Some((new_timeline_id, loaded_timeline))) + Ok(Some(loaded_timeline)) } diff --git a/pageserver/src/walreceiver/walreceiver_connection.rs b/pageserver/src/walreceiver/walreceiver_connection.rs index 2c29a56ad2..d441bbb4ab 100644 --- a/pageserver/src/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/walreceiver/walreceiver_connection.rs @@ -132,7 +132,7 @@ pub async fn handle_walreceiver_connection( let (repo, timeline) = tokio::task::spawn_blocking(move || { let repo = tenant_mgr::get_repository_for_tenant(tenant_id) .with_context(|| format!("no repository found for tenant {tenant_id}"))?; - let timeline = tenant_mgr::get_local_timeline_with_load(tenant_id, timeline_id) + let timeline = repo.get_timeline(timeline_id) .with_context(|| { format!("local timeline {timeline_id} not found for tenant {tenant_id}") })?; diff --git a/test_runner/regress/test_broken_timeline.py b/test_runner/regress/test_broken_timeline.py index 31b54f827b..4aba2494e9 100644 --- a/test_runner/regress/test_broken_timeline.py +++ b/test_runner/regress/test_broken_timeline.py @@ -68,9 +68,11 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder): # But all others are broken - # First timeline would fail instantly due to corrupt metadata file + # First timeline would not get loaded into pageserver due to corrupt metadata file (_tenant, _timeline, pg) = tenant_timelines[1] - with pytest.raises(Exception, match="Cannot load local timeline") as err: + with pytest.raises( + Exception, match=f"Could not get timeline {timeline1} in tenant {tenant1}" + ) as err: pg.start() log.info(f"compute startup failed eagerly for timeline with corrupt metadata: {err}") diff --git a/test_runner/regress/test_pageserver_api.py b/test_runner/regress/test_pageserver_api.py index 8ee38fcf4f..a7b7189824 100644 --- a/test_runner/regress/test_pageserver_api.py +++ b/test_runner/regress/test_pageserver_api.py @@ -93,10 +93,7 @@ def check_client(client: NeonPageserverHttpClient, initial_tenant: ZTenantId): assert ZTenantId(timeline_details["tenant_id"]) == tenant_id assert ZTimelineId(timeline_details["timeline_id"]) == timeline_id - - local_timeline_details = timeline_details.get("local") - assert local_timeline_details is not None - assert local_timeline_details["timeline_state"] == "Loaded" + assert timeline_details.get("local") is not None def test_pageserver_http_get_wal_receiver_not_found(neon_simple_env: NeonEnv):