From 76aa01c90f30a3d40355f0ef18e8448bb814385b Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Thu, 24 Aug 2023 16:07:40 +0300 Subject: [PATCH] refactor: single phase Timeline::load_layer_map (#5074) Current implementation first calls `load_layer_map`, which loads all local layers, cleans up files, leave cleaning up stuff to "second function". Then the "second function" is finally called, it does not do the cleanup and some of the first functions setup can torn down. "Second function" is actually both `reconcile_with_remote` and `create_remote_layers`. This change makes it a bit more verbose but in one phase with the following sub-steps: 1. scan the timeline directory 2. delete extra files - now including on-demand download files - fixes #3660 3. recoincile the two sources of layers (directory, index_part) 4. rename_to_backup future layers, short layers 5. create the remaining as layers Needed by #4938. It was also noticed that this is blocking code in an `async fn` so just do it in a `spawn_blocking`, which should be healthy for our startup times. Other effects includes hopefully halving of `stat` calls; extra calls which were not done previously are now done for the future layers. Co-authored-by: Christian Schwarz Co-authored-by: John Spray --- pageserver/src/tenant.rs | 71 ++- .../src/tenant/remote_timeline_client.rs | 12 +- pageserver/src/tenant/storage_layer.rs | 21 +- .../src/tenant/storage_layer/filename.rs | 11 + .../src/tenant/storage_layer/remote_layer.rs | 14 +- pageserver/src/tenant/timeline.rs | 483 ++++++------------ pageserver/src/tenant/timeline/init.rs | 199 ++++++++ .../src/tenant/timeline/layer_manager.rs | 27 +- test_runner/regress/test_ondemand_download.py | 6 +- 9 files changed, 423 insertions(+), 421 deletions(-) create mode 100644 pageserver/src/tenant/timeline/init.rs diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index ba8f6348ad..4553a6515d 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -422,13 +422,53 @@ impl Tenant { init_order, CreateTimelineCause::Load, )?; - let new_disk_consistent_lsn = timeline.get_disk_consistent_lsn(); + let disk_consistent_lsn = timeline.get_disk_consistent_lsn(); anyhow::ensure!( - new_disk_consistent_lsn.is_valid(), + disk_consistent_lsn.is_valid(), "Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn" ); + assert_eq!( + disk_consistent_lsn, + up_to_date_metadata.disk_consistent_lsn(), + "these are used interchangeably" + ); + + // Save the metadata file to local disk. + if !picked_local { + save_metadata( + self.conf, + &tenant_id, + &timeline_id, + up_to_date_metadata, + first_save, + ) + .context("save_metadata")?; + } + + let index_part = remote_startup_data.as_ref().map(|x| &x.index_part); + + if let Some(index_part) = index_part { + timeline + .remote_client + .as_ref() + .unwrap() + .init_upload_queue(index_part)?; + } else if self.remote_storage.is_some() { + // No data on the remote storage, but we have local metadata file. We can end up + // here with timeline_create being interrupted before finishing index part upload. + // By doing what we do here, the index part upload is retried. + // If control plane retries timeline creation in the meantime, the mgmt API handler + // for timeline creation will coalesce on the upload we queue here. + let rtc = timeline.remote_client.as_ref().unwrap(); + rtc.init_upload_queue_for_empty_remote(up_to_date_metadata)?; + rtc.schedule_index_upload_for_metadata_update(up_to_date_metadata)?; + } + timeline - .load_layer_map(new_disk_consistent_lsn) + .load_layer_map( + disk_consistent_lsn, + remote_startup_data.map(|x| x.index_part), + ) .await .with_context(|| { format!("Failed to load layermap for timeline {tenant_id}/{timeline_id}") @@ -452,19 +492,6 @@ impl Tenant { } }; - if self.remote_storage.is_some() { - // Reconcile local state with remote storage, downloading anything that's - // missing locally, and scheduling uploads for anything that's missing - // in remote storage. - timeline - .reconcile_with_remote( - up_to_date_metadata, - remote_startup_data.as_ref().map(|r| &r.index_part), - ) - .await - .context("failed to reconcile with remote")? - } - // Sanity check: a timeline should have some content. anyhow::ensure!( ancestor.is_some() @@ -479,18 +506,6 @@ impl Tenant { "Timeline has no ancestor and no layer files" ); - // Save the metadata file to local disk. - if !picked_local { - save_metadata( - self.conf, - &tenant_id, - &timeline_id, - up_to_date_metadata, - first_save, - ) - .context("save_metadata")?; - } - Ok(()) } diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 0324197da6..a82e32b659 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -135,7 +135,7 @@ //! - Initiate upload queue with that [`IndexPart`]. //! - Reschedule all lost operations by comparing the local filesystem state //! and remote state as per [`IndexPart`]. This is done in -//! [`Tenant::timeline_init_and_sync`] and [`Timeline::reconcile_with_remote`]. +//! [`Tenant::timeline_init_and_sync`]. //! //! Note that if we crash during file deletion between the index update //! that removes the file from the list of files, and deleting the remote file, @@ -172,7 +172,6 @@ //! transitioning it from `TenantState::Attaching` to `TenantState::Active` state. //! This starts the timelines' WAL-receivers and the tenant's GC & Compaction loops. //! -//! Most of the above steps happen in [`Timeline::reconcile_with_remote`] or its callers. //! We keep track of the fact that a client is in `Attaching` state in a marker //! file on the local disk. This is critical because, when we restart the pageserver, //! we do not want to do the `List timelines` step for each tenant that has already @@ -192,14 +191,14 @@ //! not created and the uploads are skipped. //! Theoretically, it should be ok to remove and re-add remote storage configuration to //! the pageserver config at any time, since it doesn't make a difference to -//! `reconcile_with_remote`. +//! [`Timeline::load_layer_map`]. //! Of course, the remote timeline dir must not change while we have de-configured //! remote storage, i.e., the pageserver must remain the owner of the given prefix //! in remote storage. //! But note that we don't test any of this right now. //! //! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync -//! [`Timeline::reconcile_with_remote`]: super::Timeline::reconcile_with_remote +//! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map mod delete; mod download; @@ -355,6 +354,10 @@ impl RemoteTimelineClient { let mut upload_queue = self.upload_queue.lock().unwrap(); upload_queue.initialize_with_current_remote_index_part(index_part)?; self.update_remote_physical_size_gauge(Some(index_part)); + info!( + "initialized upload queue from remote index with {} layer files", + index_part.layer_metadata.len() + ); Ok(()) } @@ -367,6 +370,7 @@ impl RemoteTimelineClient { let mut upload_queue = self.upload_queue.lock().unwrap(); upload_queue.initialize_empty_remote(local_metadata)?; self.update_remote_physical_size_gauge(None); + info!("initialized upload queue as empty"); Ok(()) } diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 6fffeece1c..a39e041eaf 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -41,8 +41,6 @@ pub use inmemory_layer::InMemoryLayer; pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey}; pub use remote_layer::RemoteLayer; -use super::timeline::layer_manager::LayerManager; - pub fn range_overlaps(a: &Range, b: &Range) -> bool where T: PartialOrd, @@ -175,16 +173,9 @@ impl LayerAccessStats { /// /// [`LayerLoad`]: LayerResidenceEventReason::LayerLoad /// [`record_residence_event`]: Self::record_residence_event - pub(crate) fn for_loading_layer( - layer_map_lock_held_witness: &LayerManager, - status: LayerResidenceStatus, - ) -> Self { + pub(crate) fn for_loading_layer(status: LayerResidenceStatus) -> Self { let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default())); - new.record_residence_event( - layer_map_lock_held_witness, - status, - LayerResidenceEventReason::LayerLoad, - ); + new.record_residence_event(status, LayerResidenceEventReason::LayerLoad); new } @@ -197,7 +188,6 @@ impl LayerAccessStats { /// [`record_residence_event`]: Self::record_residence_event pub(crate) fn clone_for_residence_change( &self, - layer_map_lock_held_witness: &LayerManager, new_status: LayerResidenceStatus, ) -> LayerAccessStats { let clone = { @@ -205,11 +195,7 @@ impl LayerAccessStats { inner.clone() }; let new = LayerAccessStats(Mutex::new(clone)); - new.record_residence_event( - layer_map_lock_held_witness, - new_status, - LayerResidenceEventReason::ResidenceChange, - ); + new.record_residence_event(new_status, LayerResidenceEventReason::ResidenceChange); new } @@ -229,7 +215,6 @@ impl LayerAccessStats { /// pub(crate) fn record_residence_event( &self, - _layer_map_lock_held_witness: &LayerManager, status: LayerResidenceStatus, reason: LayerResidenceEventReason, ) { diff --git a/pageserver/src/tenant/storage_layer/filename.rs b/pageserver/src/tenant/storage_layer/filename.rs index 843bb1f631..922bb18bff 100644 --- a/pageserver/src/tenant/storage_layer/filename.rs +++ b/pageserver/src/tenant/storage_layer/filename.rs @@ -215,6 +215,17 @@ impl LayerFileName { pub fn file_name(&self) -> String { self.to_string() } + + /// Determines if this layer file is considered to be in future meaning we will discard these + /// layers during timeline initialization from the given disk_consistent_lsn. + pub(crate) fn is_in_future(&self, disk_consistent_lsn: Lsn) -> bool { + use LayerFileName::*; + match self { + Image(file_name) if file_name.lsn > disk_consistent_lsn => true, + Delta(file_name) if file_name.lsn_range.end > disk_consistent_lsn + 1 => true, + _ => false, + } + } } impl fmt::Display for LayerFileName { diff --git a/pageserver/src/tenant/storage_layer/remote_layer.rs b/pageserver/src/tenant/storage_layer/remote_layer.rs index 6ccdbb4fdf..3f8d700863 100644 --- a/pageserver/src/tenant/storage_layer/remote_layer.rs +++ b/pageserver/src/tenant/storage_layer/remote_layer.rs @@ -185,7 +185,7 @@ impl RemoteLayer { /// Create a Layer struct representing this layer, after it has been downloaded. pub(crate) fn create_downloaded_layer( &self, - layer_map_lock_held_witness: &LayerManager, + _layer_map_lock_held_witness: &LayerManager, conf: &'static PageServerConf, file_size: u64, ) -> Arc { @@ -197,10 +197,8 @@ impl RemoteLayer { self.desc.tenant_id, &fname, file_size, - self.access_stats.clone_for_residence_change( - layer_map_lock_held_witness, - LayerResidenceStatus::Resident, - ), + self.access_stats + .clone_for_residence_change(LayerResidenceStatus::Resident), )) } else { let fname = self.desc.image_file_name(); @@ -210,10 +208,8 @@ impl RemoteLayer { self.desc.tenant_id, &fname, file_size, - self.access_stats.clone_for_residence_change( - layer_map_lock_held_witness, - LayerResidenceStatus::Resident, - ), + self.access_stats + .clone_for_residence_change(LayerResidenceStatus::Resident), )) } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 4880293c33..6245c639dd 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1,5 +1,6 @@ pub mod delete; mod eviction_task; +mod init; pub mod layer_manager; mod logical_size; pub mod span; @@ -27,7 +28,6 @@ use utils::id::TenantTimelineId; use std::cmp::{max, min, Ordering}; use std::collections::{BinaryHeap, HashMap, HashSet}; -use std::fs; use std::ops::{Deref, Range}; use std::path::{Path, PathBuf}; use std::pin::pin; @@ -38,15 +38,13 @@ use std::time::{Duration, Instant, SystemTime}; use crate::context::{ AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder, }; -use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata}; +use crate::tenant::remote_timeline_client::index::LayerFileMetadata; use crate::tenant::storage_layer::delta_layer::DeltaEntry; use crate::tenant::storage_layer::{ - DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer, - LayerAccessStats, LayerFileName, RemoteLayer, + DeltaLayerWriter, ImageLayerWriter, InMemoryLayer, LayerAccessStats, LayerFileName, RemoteLayer, }; use crate::tenant::timeline::logical_size::CurrentLogicalSize; use crate::tenant::{ - ephemeral_file::is_ephemeral_file, layer_map::{LayerMap, SearchResult}, metadata::{save_metadata, TimelineMetadata}, par_fsync, @@ -78,11 +76,10 @@ use utils::{ use crate::page_cache; use crate::repository::GcResult; use crate::repository::{Key, Value}; +use crate::task_mgr; use crate::task_mgr::TaskKind; use crate::walredo::WalRedoManager; -use crate::METADATA_FILE_NAME; use crate::ZERO_PAGE; -use crate::{is_temporary, task_mgr}; use self::delete::DeleteTimelineFlow; pub(super) use self::eviction_task::EvictionTaskTenantState; @@ -1211,7 +1208,7 @@ impl Timeline { &layer_metadata, local_layer .access_stats() - .clone_for_residence_change(layer_mgr, LayerResidenceStatus::Evicted), + .clone_for_residence_change(LayerResidenceStatus::Evicted), ), LayerFileName::Delta(delta_name) => RemoteLayer::new_delta( self.tenant_id, @@ -1220,7 +1217,7 @@ impl Timeline { &layer_metadata, local_layer .access_stats() - .clone_for_residence_change(layer_mgr, LayerResidenceStatus::Evicted), + .clone_for_residence_change(LayerResidenceStatus::Evicted), ), }); @@ -1518,7 +1515,7 @@ impl Timeline { let layer_flush_start_rx = self.layer_flush_start_tx.subscribe(); let self_clone = Arc::clone(self); - info!("spawning flush loop"); + debug!("spawning flush loop"); *flush_loop_state = FlushLoopState::Running { #[cfg(test)] expect_initdb_optimization: false, @@ -1589,9 +1586,7 @@ impl Timeline { )); } - /// /// Initialize with an empty layer map. Used when creating a new timeline. - /// pub(super) fn init_empty_layer_map(&self, start_lsn: Lsn) { let mut layers = self.layers.try_write().expect( "in the context where we call this function, no other task has access to the object", @@ -1599,10 +1594,16 @@ impl Timeline { layers.initialize_empty(Lsn(start_lsn.0)); } - /// - /// Scan the timeline directory to populate the layer map. - /// - pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> { + /// Scan the timeline directory, cleanup, populate the layer map, and schedule uploads for local-only + /// files. + pub(super) async fn load_layer_map( + &self, + disk_consistent_lsn: Lsn, + index_part: Option, + ) -> anyhow::Result<()> { + use init::{Decision::*, Discovered, FutureLayer}; + use LayerFileName::*; + let mut guard = self.layers.write().await; let timer = self.metrics.load_layer_map_histo.start_timer(); @@ -1610,102 +1611,150 @@ impl Timeline { // Scan timeline directory and create ImageFileName and DeltaFilename // structs representing all files on disk let timeline_path = self.conf.timeline_path(&self.tenant_id, &self.timeline_id); - // total size of layer files in the current timeline directory - let mut total_physical_size = 0; + let (conf, tenant_id, timeline_id) = (self.conf, self.tenant_id, self.timeline_id); + let span = tracing::Span::current(); - let mut loaded_layers = Vec::>::new(); + let (loaded_layers, needs_upload, total_physical_size) = tokio::task::spawn_blocking({ + move || { + let _g = span.entered(); + let discovered = init::scan_timeline_dir(&timeline_path)?; + let mut discovered_layers = Vec::with_capacity(discovered.len()); + let mut unrecognized_files = Vec::new(); - for direntry in fs::read_dir(timeline_path)? { - let direntry = direntry?; - let direntry_path = direntry.path(); - let fname = direntry.file_name(); - let fname = fname.to_string_lossy(); + let mut path = timeline_path; - if let Some(filename) = ImageFileName::parse_str(&fname) { - // create an ImageLayer struct for each image file. - if filename.lsn > disk_consistent_lsn { - info!( - "found future image layer {} on timeline {} disk_consistent_lsn is {}", - filename, self.timeline_id, disk_consistent_lsn - ); - - rename_to_backup(&direntry_path)?; - continue; + for discovered in discovered { + let (name, kind) = match discovered { + Discovered::Layer(file_name, file_size) => { + discovered_layers.push((file_name, file_size)); + continue; + } + Discovered::Metadata | Discovered::IgnoredBackup => { + continue; + } + Discovered::Unknown(file_name) => { + // we will later error if there are any + unrecognized_files.push(file_name); + continue; + } + Discovered::Ephemeral(name) => (name, "old ephemeral file"), + Discovered::Temporary(name) => (name, "temporary timeline file"), + Discovered::TemporaryDownload(name) => (name, "temporary download"), + }; + path.push(name); + init::cleanup(&path, kind)?; + path.pop(); } - let file_size = direntry_path.metadata()?.len(); - let stats = - LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Resident); - - let layer = ImageLayer::new( - self.conf, - self.timeline_id, - self.tenant_id, - &filename, - file_size, - stats, - ); - - total_physical_size += file_size; - loaded_layers.push(Arc::new(layer)); - } else if let Some(filename) = DeltaFileName::parse_str(&fname) { - // Create a DeltaLayer struct for each delta file. - // The end-LSN is exclusive, while disk_consistent_lsn is - // inclusive. For example, if disk_consistent_lsn is 100, it is - // OK for a delta layer to have end LSN 101, but if the end LSN - // is 102, then it might not have been fully flushed to disk - // before crash. - if filename.lsn_range.end > disk_consistent_lsn + 1 { - info!( - "found future delta layer {} on timeline {} disk_consistent_lsn is {}", - filename, self.timeline_id, disk_consistent_lsn + if !unrecognized_files.is_empty() { + // assume that if there are any there are many many. + let n = unrecognized_files.len(); + let first = &unrecognized_files[..n.min(10)]; + anyhow::bail!( + "unrecognized files in timeline dir (total {n}), first 10: {first:?}" ); - - rename_to_backup(&direntry_path)?; - continue; } - let file_size = direntry_path.metadata()?.len(); - let stats = - LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Resident); + let decided = + init::reconcile(discovered_layers, index_part.as_ref(), disk_consistent_lsn); - let layer = DeltaLayer::new( - self.conf, - self.timeline_id, - self.tenant_id, - &filename, - file_size, - stats, - ); + let mut loaded_layers = Vec::new(); + let mut needs_upload = Vec::new(); + let mut total_physical_size = 0; - total_physical_size += file_size; - loaded_layers.push(Arc::new(layer)); - } else if fname == METADATA_FILE_NAME || fname.ends_with(".old") { - // ignore these - } else if remote_timeline_client::is_temp_download_file(&direntry_path) { - info!( - "skipping temp download file, reconcile_with_remote will resume / clean up: {}", - fname - ); - } else if is_ephemeral_file(&fname) { - // Delete any old ephemeral files - trace!("deleting old ephemeral file in timeline dir: {}", fname); - fs::remove_file(&direntry_path)?; - } else if is_temporary(&direntry_path) { - info!("removing temp timeline file at {}", direntry_path.display()); - fs::remove_file(&direntry_path).with_context(|| { - format!( - "failed to remove temp download file at {}", - direntry_path.display() - ) - })?; - } else { - warn!("unrecognized filename in timeline dir: {}", fname); + for (name, decision) in decided { + let decision = match decision { + Ok(UseRemote { local, remote }) => { + path.push(name.file_name()); + init::cleanup_local_file_for_remote(&path, &local, &remote)?; + path.pop(); + + UseRemote { local, remote } + } + Ok(decision) => decision, + Err(FutureLayer { local }) => { + if local.is_some() { + path.push(name.file_name()); + init::cleanup_future_layer(&path, name, disk_consistent_lsn)?; + path.pop(); + } else { + // we cannot do anything for remote layers, but not continuing to + // process it will leave it out index_part.json as well. + } + // + // we do not currently schedule deletions for these. + continue; + } + }; + + match &name { + Delta(d) => assert!(d.lsn_range.end <= disk_consistent_lsn + 1), + Image(i) => assert!(i.lsn <= disk_consistent_lsn), + } + + let status = match &decision { + UseLocal(_) | NeedsUpload(_) => LayerResidenceStatus::Resident, + Evicted(_) | UseRemote { .. } => LayerResidenceStatus::Evicted, + }; + + let stats = LayerAccessStats::for_loading_layer(status); + + let layer: Arc = match (name, &decision) { + (Delta(d), UseLocal(m) | NeedsUpload(m)) => { + total_physical_size += m.file_size(); + Arc::new(DeltaLayer::new( + conf, + timeline_id, + tenant_id, + &d, + m.file_size(), + stats, + )) + } + (Image(i), UseLocal(m) | NeedsUpload(m)) => { + total_physical_size += m.file_size(); + Arc::new(ImageLayer::new( + conf, + timeline_id, + tenant_id, + &i, + m.file_size(), + stats, + )) + } + (Delta(d), Evicted(remote) | UseRemote { remote, .. }) => Arc::new( + RemoteLayer::new_delta(tenant_id, timeline_id, &d, remote, stats), + ), + (Image(i), Evicted(remote) | UseRemote { remote, .. }) => Arc::new( + RemoteLayer::new_img(tenant_id, timeline_id, &i, remote, stats), + ), + }; + + if let NeedsUpload(m) = decision { + needs_upload.push((layer.clone(), m)); + } + + loaded_layers.push(layer); + } + Ok((loaded_layers, needs_upload, total_physical_size)) } - } + }) + .await + .map_err(anyhow::Error::new) + .and_then(|x| x)?; let num_layers = loaded_layers.len(); - guard.initialize_local_layers(loaded_layers, Lsn(disk_consistent_lsn.0) + 1); + + guard.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1); + + if let Some(rtc) = self.remote_client.as_ref() { + for (layer, m) in needs_upload { + rtc.schedule_layer_file_upload(&layer.layer_desc().filename(), &m)?; + } + rtc.schedule_index_upload_for_file_changes()?; + // Tenant::create_timeline will wait for these uploads to happen before returning, or + // on retry. + } info!( "loaded layer map with {} layers at {}, total physical size: {}", @@ -1716,236 +1765,6 @@ impl Timeline { .set(total_physical_size); timer.stop_and_record(); - - Ok(()) - } - - async fn create_remote_layers( - &self, - index_part: &IndexPart, - local_layers: HashMap>, - up_to_date_disk_consistent_lsn: Lsn, - ) -> anyhow::Result>> { - // Are we missing some files that are present in remote storage? - // Create RemoteLayer instances for them. - let mut local_only_layers = local_layers; - - // We're holding a layer map lock for a while but this - // method is only called during init so it's fine. - let mut guard = self.layers.write().await; - - let mut corrupted_local_layers = Vec::new(); - let mut added_remote_layers = Vec::new(); - for remote_layer_name in index_part.layer_metadata.keys() { - let local_layer = local_only_layers.remove(remote_layer_name); - - let remote_layer_metadata = index_part - .layer_metadata - .get(remote_layer_name) - .map(LayerFileMetadata::from) - .with_context(|| { - format!( - "No remote layer metadata found for layer {}", - remote_layer_name.file_name() - ) - })?; - - // Is the local layer's size different from the size stored in the - // remote index file? - // If so, rename_to_backup those files & replace their local layer with - // a RemoteLayer in the layer map so that we re-download them on-demand. - if let Some(local_layer) = local_layer { - let local_layer_path = local_layer - .local_path() - .expect("caller must ensure that local_layers only contains local layers"); - ensure!( - local_layer_path.exists(), - "every layer from local_layers must exist on disk: {}", - local_layer_path.display() - ); - - let remote_size = remote_layer_metadata.file_size(); - let metadata = local_layer_path.metadata().with_context(|| { - format!( - "get file size of local layer {}", - local_layer_path.display() - ) - })?; - let local_size = metadata.len(); - if local_size != remote_size { - warn!("removing local file {local_layer_path:?} because it has unexpected length {local_size}; length in remote index is {remote_size}"); - if let Err(err) = rename_to_backup(&local_layer_path) { - assert!(local_layer_path.exists(), "we would leave the local_layer without a file if this does not hold: {}", local_layer_path.display()); - anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}"); - } else { - self.metrics.resident_physical_size_gauge.sub(local_size); - corrupted_local_layers.push(local_layer); - // fall-through to adding the remote layer - } - } else { - debug!( - "layer is present locally and file size matches remote, using it: {}", - local_layer_path.display() - ); - continue; - } - } - - info!( - "remote layer does not exist locally, creating remote layer: {}", - remote_layer_name.file_name() - ); - - match remote_layer_name { - LayerFileName::Image(imgfilename) => { - if imgfilename.lsn > up_to_date_disk_consistent_lsn { - info!( - "found future image layer {} on timeline {} remote_consistent_lsn is {}", - imgfilename, self.timeline_id, up_to_date_disk_consistent_lsn - ); - continue; - } - let stats = - LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Evicted); - - let remote_layer = RemoteLayer::new_img( - self.tenant_id, - self.timeline_id, - imgfilename, - &remote_layer_metadata, - stats, - ); - let remote_layer = Arc::new(remote_layer); - added_remote_layers.push(remote_layer); - } - LayerFileName::Delta(deltafilename) => { - // Create a RemoteLayer for the delta file. - // The end-LSN is exclusive, while disk_consistent_lsn is - // inclusive. For example, if disk_consistent_lsn is 100, it is - // OK for a delta layer to have end LSN 101, but if the end LSN - // is 102, then it might not have been fully flushed to disk - // before crash. - if deltafilename.lsn_range.end > up_to_date_disk_consistent_lsn + 1 { - info!( - "found future delta layer {} on timeline {} remote_consistent_lsn is {}", - deltafilename, self.timeline_id, up_to_date_disk_consistent_lsn - ); - continue; - } - let stats = - LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Evicted); - - let remote_layer = RemoteLayer::new_delta( - self.tenant_id, - self.timeline_id, - deltafilename, - &remote_layer_metadata, - stats, - ); - let remote_layer = Arc::new(remote_layer); - added_remote_layers.push(remote_layer); - } - } - } - guard.initialize_remote_layers(corrupted_local_layers, added_remote_layers); - Ok(local_only_layers) - } - - /// This function will synchronize local state with what we have in remote storage. - /// - /// Steps taken: - /// 1. Initialize upload queue based on `index_part`. - /// 2. Create `RemoteLayer` instances for layers that exist only on the remote. - /// The list of layers on the remote comes from `index_part`. - /// The list of local layers is given by the layer map's `iter_historic_layers()`. - /// So, the layer map must have been loaded already. - /// 3. Schedule upload of local-only layer files (which will then also update the remote - /// IndexPart to include the new layer files). - /// - /// Refer to the [`remote_timeline_client`] module comment for more context. - /// - /// # TODO - /// May be a bit cleaner to do things based on populated remote client, - /// and then do things based on its upload_queue.latest_files. - #[instrument(skip(self, index_part, up_to_date_metadata))] - pub async fn reconcile_with_remote( - &self, - up_to_date_metadata: &TimelineMetadata, - index_part: Option<&IndexPart>, - ) -> anyhow::Result<()> { - info!("starting"); - let remote_client = self - .remote_client - .as_ref() - .ok_or_else(|| anyhow!("cannot download without remote storage"))?; - - let disk_consistent_lsn = up_to_date_metadata.disk_consistent_lsn(); - - let local_layers = { - let guard = self.layers.read().await; - let layers = guard.layer_map(); - layers - .iter_historic_layers() - .map(|l| (l.filename(), guard.get_from_desc(&l))) - .collect::>() - }; - - // If no writes happen, new branches do not have any layers, only the metadata file. - let has_local_layers = !local_layers.is_empty(); - let local_only_layers = match index_part { - Some(index_part) => { - info!( - "initializing upload queue from remote index with {} layer files", - index_part.layer_metadata.len() - ); - remote_client.init_upload_queue(index_part)?; - self.create_remote_layers(index_part, local_layers, disk_consistent_lsn) - .await? - } - None => { - info!("initializing upload queue as empty"); - remote_client.init_upload_queue_for_empty_remote(up_to_date_metadata)?; - local_layers - } - }; - - if has_local_layers { - // Are there local files that don't exist remotely? Schedule uploads for them. - // Local timeline metadata will get uploaded to remove along witht he layers. - for (layer_name, layer) in &local_only_layers { - // XXX solve this in the type system - let layer_path = layer - .local_path() - .expect("local_only_layers only contains local layers"); - let layer_size = layer_path - .metadata() - .with_context(|| format!("failed to get file {layer_path:?} metadata"))? - .len(); - info!("scheduling {layer_path:?} for upload"); - remote_client - .schedule_layer_file_upload(layer_name, &LayerFileMetadata::new(layer_size))?; - } - remote_client.schedule_index_upload_for_file_changes()?; - } else if index_part.is_none() { - // No data on the remote storage, no local layers, local metadata file. - // - // TODO https://github.com/neondatabase/neon/issues/3865 - // Currently, console does not wait for the timeline data upload to the remote storage - // and considers the timeline created, expecting other pageserver nodes to work with it. - // Branch metadata upload could get interrupted (e.g pageserver got killed), - // hence any locally existing branch metadata with no remote counterpart should be uploaded, - // otherwise any other pageserver won't see the branch on `attach`. - // - // After the issue gets implemented, pageserver should rather remove the branch, - // since absence on S3 means we did not acknowledge the branch creation and console will have to retry, - // no need to keep the old files. - remote_client.schedule_index_upload_for_metadata_update(up_to_date_metadata)?; - } else { - // Local timeline has a metadata file, remote one too, both have no layers to sync. - } - - info!("Done"); - Ok(()) } @@ -2852,7 +2671,6 @@ impl Timeline { if let Some(ref l) = delta_layer_to_add { // TODO: move access stats, metrics update, etc. into layer manager. l.access_stats().record_residence_event( - &guard, LayerResidenceStatus::Resident, LayerResidenceEventReason::LayerCreate, ); @@ -3241,7 +3059,6 @@ impl Timeline { .add(metadata.len()); let l = Arc::new(l); l.access_stats().record_residence_event( - &guard, LayerResidenceStatus::Resident, LayerResidenceEventReason::LayerCreate, ); @@ -3921,7 +3738,6 @@ impl Timeline { new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len())); l.access_stats().record_residence_event( - &guard, LayerResidenceStatus::Resident, LayerResidenceEventReason::LayerCreate, ); @@ -4840,7 +4656,8 @@ fn rename_to_backup(path: &Path) -> anyhow::Result<()> { for i in 0u32.. { new_path.set_file_name(format!("{filename}.{i}.old")); if !new_path.exists() { - std::fs::rename(path, &new_path)?; + std::fs::rename(path, &new_path) + .with_context(|| format!("rename {path:?} to {new_path:?}"))?; return Ok(()); } } diff --git a/pageserver/src/tenant/timeline/init.rs b/pageserver/src/tenant/timeline/init.rs new file mode 100644 index 0000000000..cdd4227b44 --- /dev/null +++ b/pageserver/src/tenant/timeline/init.rs @@ -0,0 +1,199 @@ +use crate::{ + is_temporary, + tenant::{ + ephemeral_file::is_ephemeral_file, + remote_timeline_client::{ + self, + index::{IndexPart, LayerFileMetadata}, + }, + storage_layer::LayerFileName, + }, + METADATA_FILE_NAME, +}; +use anyhow::Context; +use std::{collections::HashMap, ffi::OsString, path::Path, str::FromStr}; +use utils::lsn::Lsn; + +/// Identified files in the timeline directory. +pub(super) enum Discovered { + /// The only one we care about + Layer(LayerFileName, u64), + /// Old ephmeral files from previous launches, should be removed + Ephemeral(OsString), + /// Old temporary timeline files, unsure what these really are, should be removed + Temporary(OsString), + /// Temporary on-demand download files, should be removed + TemporaryDownload(OsString), + /// "metadata" file we persist locally and include in `index_part.json` + Metadata, + /// Backup file from previously future layers + IgnoredBackup, + /// Unrecognized, warn about these + Unknown(OsString), +} + +/// Scans the timeline directory for interesting files. +pub(super) fn scan_timeline_dir(path: &Path) -> anyhow::Result> { + let mut ret = Vec::new(); + + for direntry in std::fs::read_dir(path)? { + let direntry = direntry?; + let direntry_path = direntry.path(); + let file_name = direntry.file_name(); + + let fname = file_name.to_string_lossy(); + + let discovered = match LayerFileName::from_str(&fname) { + Ok(file_name) => { + let file_size = direntry.metadata()?.len(); + Discovered::Layer(file_name, file_size) + } + Err(_) => { + if fname == METADATA_FILE_NAME { + Discovered::Metadata + } else if fname.ends_with(".old") { + // ignore these + Discovered::IgnoredBackup + } else if remote_timeline_client::is_temp_download_file(&direntry_path) { + Discovered::TemporaryDownload(file_name) + } else if is_ephemeral_file(&fname) { + Discovered::Ephemeral(file_name) + } else if is_temporary(&direntry_path) { + Discovered::Temporary(file_name) + } else { + Discovered::Unknown(file_name) + } + } + }; + + ret.push(discovered); + } + + Ok(ret) +} + +/// Decision on what to do with a layer file after considering its local and remote metadata. +#[derive(Clone)] +pub(super) enum Decision { + /// The layer is not present locally. + Evicted(LayerFileMetadata), + /// The layer is present locally, but local metadata does not match remote; we must + /// delete it and treat it as evicted. + UseRemote { + local: LayerFileMetadata, + remote: LayerFileMetadata, + }, + /// The layer is present locally, and metadata matches. + UseLocal(LayerFileMetadata), + /// The layer is only known locally, it needs to be uploaded. + NeedsUpload(LayerFileMetadata), +} + +/// The related layer is is in future compared to disk_consistent_lsn, it must not be loaded. +#[derive(Debug)] +pub(super) struct FutureLayer { + /// The local metadata. `None` if the layer is only known through [`IndexPart`]. + pub(super) local: Option, +} + +/// Merges local discoveries and remote [`IndexPart`] to a collection of decisions. +/// +/// This function should not gain additional reasons to fail than [`FutureLayer`], consider adding +/// the checks earlier to [`scan_timeline_dir`]. +pub(super) fn reconcile( + discovered: Vec<(LayerFileName, u64)>, + index_part: Option<&IndexPart>, + disk_consistent_lsn: Lsn, +) -> Vec<(LayerFileName, Result)> { + use Decision::*; + + // name => (local, remote) + type Collected = HashMap, Option)>; + + let mut discovered = discovered + .into_iter() + .map(|(name, file_size)| (name, (Some(LayerFileMetadata::new(file_size)), None))) + .collect::(); + + // merge any index_part information, when available + index_part + .as_ref() + .map(|ip| ip.layer_metadata.iter()) + .into_iter() + .flatten() + .map(|(name, metadata)| (name, LayerFileMetadata::from(metadata))) + .for_each(|(name, metadata)| { + if let Some(existing) = discovered.get_mut(name) { + existing.1 = Some(metadata); + } else { + discovered.insert(name.to_owned(), (None, Some(metadata))); + } + }); + + discovered + .into_iter() + .map(|(name, (local, remote))| { + let decision = if name.is_in_future(disk_consistent_lsn) { + Err(FutureLayer { local }) + } else { + Ok(match (local, remote) { + (Some(local), Some(remote)) if local != remote => UseRemote { local, remote }, + (Some(x), Some(_)) => UseLocal(x), + (None, Some(x)) => Evicted(x), + (Some(x), None) => NeedsUpload(x), + (None, None) => { + unreachable!("there must not be any non-local non-remote files") + } + }) + }; + + (name, decision) + }) + .collect::>() +} + +pub(super) fn cleanup(path: &Path, kind: &str) -> anyhow::Result<()> { + let file_name = path.file_name().expect("must be file path"); + tracing::debug!(kind, ?file_name, "cleaning up"); + std::fs::remove_file(path) + .with_context(|| format!("failed to remove {kind} at {}", path.display())) +} + +pub(super) fn cleanup_local_file_for_remote( + path: &Path, + local: &LayerFileMetadata, + remote: &LayerFileMetadata, +) -> anyhow::Result<()> { + let local_size = local.file_size(); + let remote_size = remote.file_size(); + + let file_name = path.file_name().expect("must be file path"); + tracing::warn!("removing local file {file_name:?} because it has unexpected length {local_size}; length in remote index is {remote_size}"); + if let Err(err) = crate::tenant::timeline::rename_to_backup(path) { + assert!( + path.exists(), + "we would leave the local_layer without a file if this does not hold: {}", + path.display() + ); + Err(err) + } else { + Ok(()) + } +} + +pub(super) fn cleanup_future_layer( + path: &Path, + name: LayerFileName, + disk_consistent_lsn: Lsn, +) -> anyhow::Result<()> { + use LayerFileName::*; + let kind = match name { + Delta(_) => "delta", + Image(_) => "image", + }; + // future image layers are allowed to be produced always for not yet flushed to disk + // lsns stored in InMemoryLayer. + tracing::info!("found future {kind} layer {name} disk_consistent_lsn is {disk_consistent_lsn}"); + crate::tenant::timeline::rename_to_backup(path)?; + Ok(()) +} diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index 25df8a4f52..5522ea1788 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -13,7 +13,7 @@ use crate::{ layer_map::{BatchedUpdates, LayerMap}, storage_layer::{ AsLayerDesc, DeltaLayer, ImageLayer, InMemoryLayer, PersistentLayer, - PersistentLayerDesc, PersistentLayerKey, RemoteLayer, + PersistentLayerDesc, PersistentLayerKey, }, timeline::compare_arced_layers, }, @@ -85,21 +85,6 @@ impl LayerManager { self.layer_map.next_open_layer_at = Some(next_open_layer_at); } - pub(crate) fn initialize_remote_layers( - &mut self, - corrupted_local_layers: Vec>, - remote_layers: Vec>, - ) { - let mut updates = self.layer_map.batch_update(); - for layer in corrupted_local_layers { - Self::remove_historic_layer(layer, &mut updates, &mut self.layer_fmgr); - } - for layer in remote_layers { - Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr); - } - updates.flush(); - } - /// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer, /// called within `get_layer_for_write`. pub(crate) fn get_layer_for_write( @@ -265,16 +250,6 @@ impl LayerManager { mapping.insert(layer); } - /// Helper function to remove a layer into the layer map and file manager - fn remove_historic_layer( - layer: Arc, - updates: &mut BatchedUpdates<'_>, - mapping: &mut LayerFileManager, - ) { - updates.remove_historic(layer.layer_desc()); - mapping.remove(layer); - } - /// Removes the layer from local FS (if present) and from memory. /// Remote storage is not affected by this operation. fn delete_historic_layer( diff --git a/test_runner/regress/test_ondemand_download.py b/test_runner/regress/test_ondemand_download.py index 0640e65e57..a4e86e0519 100644 --- a/test_runner/regress/test_ondemand_download.py +++ b/test_runner/regress/test_ondemand_download.py @@ -369,7 +369,7 @@ def test_download_remote_layers_api( filled_current_physical = get_api_current_physical_size() log.info(filled_current_physical) filled_size = get_resident_physical_size() - log.info(filled_size) + log.info(f"filled_size: {filled_size}") assert filled_current_physical == filled_size, "we don't yet do layer eviction" env.pageserver.stop() @@ -377,7 +377,7 @@ def test_download_remote_layers_api( # remove all the layer files # XXX only delete some of the layer files, to show that it really just downloads all the layers for layer in (Path(env.repo_dir) / "tenants").glob("*/timelines/*/*-*_*"): - log.info(f"unlinking layer {layer}") + log.info(f"unlinking layer {layer.name}") layer.unlink() # Shut down safekeepers before starting the pageserver. @@ -403,7 +403,7 @@ def test_download_remote_layers_api( filled_current_physical == get_api_current_physical_size() ), "current_physical_size is sum of loaded layer sizes, independent of whether local or remote" post_unlink_size = get_resident_physical_size() - log.info(post_unlink_size) + log.info(f"post_unlink_size: {post_unlink_size}") assert ( post_unlink_size < filled_size ), "we just deleted layers and didn't cause anything to re-download them yet"