diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index ba8f6348ad..4553a6515d 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -422,13 +422,53 @@ impl Tenant { init_order, CreateTimelineCause::Load, )?; - let new_disk_consistent_lsn = timeline.get_disk_consistent_lsn(); + let disk_consistent_lsn = timeline.get_disk_consistent_lsn(); anyhow::ensure!( - new_disk_consistent_lsn.is_valid(), + disk_consistent_lsn.is_valid(), "Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn" ); + assert_eq!( + disk_consistent_lsn, + up_to_date_metadata.disk_consistent_lsn(), + "these are used interchangeably" + ); + + // Save the metadata file to local disk. + if !picked_local { + save_metadata( + self.conf, + &tenant_id, + &timeline_id, + up_to_date_metadata, + first_save, + ) + .context("save_metadata")?; + } + + let index_part = remote_startup_data.as_ref().map(|x| &x.index_part); + + if let Some(index_part) = index_part { + timeline + .remote_client + .as_ref() + .unwrap() + .init_upload_queue(index_part)?; + } else if self.remote_storage.is_some() { + // No data on the remote storage, but we have local metadata file. We can end up + // here with timeline_create being interrupted before finishing index part upload. + // By doing what we do here, the index part upload is retried. + // If control plane retries timeline creation in the meantime, the mgmt API handler + // for timeline creation will coalesce on the upload we queue here. + let rtc = timeline.remote_client.as_ref().unwrap(); + rtc.init_upload_queue_for_empty_remote(up_to_date_metadata)?; + rtc.schedule_index_upload_for_metadata_update(up_to_date_metadata)?; + } + timeline - .load_layer_map(new_disk_consistent_lsn) + .load_layer_map( + disk_consistent_lsn, + remote_startup_data.map(|x| x.index_part), + ) .await .with_context(|| { format!("Failed to load layermap for timeline {tenant_id}/{timeline_id}") @@ -452,19 +492,6 @@ impl Tenant { } }; - if self.remote_storage.is_some() { - // Reconcile local state with remote storage, downloading anything that's - // missing locally, and scheduling uploads for anything that's missing - // in remote storage. - timeline - .reconcile_with_remote( - up_to_date_metadata, - remote_startup_data.as_ref().map(|r| &r.index_part), - ) - .await - .context("failed to reconcile with remote")? - } - // Sanity check: a timeline should have some content. anyhow::ensure!( ancestor.is_some() @@ -479,18 +506,6 @@ impl Tenant { "Timeline has no ancestor and no layer files" ); - // Save the metadata file to local disk. - if !picked_local { - save_metadata( - self.conf, - &tenant_id, - &timeline_id, - up_to_date_metadata, - first_save, - ) - .context("save_metadata")?; - } - Ok(()) } diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 0324197da6..a82e32b659 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -135,7 +135,7 @@ //! - Initiate upload queue with that [`IndexPart`]. //! - Reschedule all lost operations by comparing the local filesystem state //! and remote state as per [`IndexPart`]. This is done in -//! [`Tenant::timeline_init_and_sync`] and [`Timeline::reconcile_with_remote`]. +//! [`Tenant::timeline_init_and_sync`]. //! //! Note that if we crash during file deletion between the index update //! that removes the file from the list of files, and deleting the remote file, @@ -172,7 +172,6 @@ //! transitioning it from `TenantState::Attaching` to `TenantState::Active` state. //! This starts the timelines' WAL-receivers and the tenant's GC & Compaction loops. //! -//! Most of the above steps happen in [`Timeline::reconcile_with_remote`] or its callers. //! We keep track of the fact that a client is in `Attaching` state in a marker //! file on the local disk. This is critical because, when we restart the pageserver, //! we do not want to do the `List timelines` step for each tenant that has already @@ -192,14 +191,14 @@ //! not created and the uploads are skipped. //! Theoretically, it should be ok to remove and re-add remote storage configuration to //! the pageserver config at any time, since it doesn't make a difference to -//! `reconcile_with_remote`. +//! [`Timeline::load_layer_map`]. //! Of course, the remote timeline dir must not change while we have de-configured //! remote storage, i.e., the pageserver must remain the owner of the given prefix //! in remote storage. //! But note that we don't test any of this right now. //! //! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync -//! [`Timeline::reconcile_with_remote`]: super::Timeline::reconcile_with_remote +//! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map mod delete; mod download; @@ -355,6 +354,10 @@ impl RemoteTimelineClient { let mut upload_queue = self.upload_queue.lock().unwrap(); upload_queue.initialize_with_current_remote_index_part(index_part)?; self.update_remote_physical_size_gauge(Some(index_part)); + info!( + "initialized upload queue from remote index with {} layer files", + index_part.layer_metadata.len() + ); Ok(()) } @@ -367,6 +370,7 @@ impl RemoteTimelineClient { let mut upload_queue = self.upload_queue.lock().unwrap(); upload_queue.initialize_empty_remote(local_metadata)?; self.update_remote_physical_size_gauge(None); + info!("initialized upload queue as empty"); Ok(()) } diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 6fffeece1c..a39e041eaf 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -41,8 +41,6 @@ pub use inmemory_layer::InMemoryLayer; pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey}; pub use remote_layer::RemoteLayer; -use super::timeline::layer_manager::LayerManager; - pub fn range_overlaps(a: &Range, b: &Range) -> bool where T: PartialOrd, @@ -175,16 +173,9 @@ impl LayerAccessStats { /// /// [`LayerLoad`]: LayerResidenceEventReason::LayerLoad /// [`record_residence_event`]: Self::record_residence_event - pub(crate) fn for_loading_layer( - layer_map_lock_held_witness: &LayerManager, - status: LayerResidenceStatus, - ) -> Self { + pub(crate) fn for_loading_layer(status: LayerResidenceStatus) -> Self { let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default())); - new.record_residence_event( - layer_map_lock_held_witness, - status, - LayerResidenceEventReason::LayerLoad, - ); + new.record_residence_event(status, LayerResidenceEventReason::LayerLoad); new } @@ -197,7 +188,6 @@ impl LayerAccessStats { /// [`record_residence_event`]: Self::record_residence_event pub(crate) fn clone_for_residence_change( &self, - layer_map_lock_held_witness: &LayerManager, new_status: LayerResidenceStatus, ) -> LayerAccessStats { let clone = { @@ -205,11 +195,7 @@ impl LayerAccessStats { inner.clone() }; let new = LayerAccessStats(Mutex::new(clone)); - new.record_residence_event( - layer_map_lock_held_witness, - new_status, - LayerResidenceEventReason::ResidenceChange, - ); + new.record_residence_event(new_status, LayerResidenceEventReason::ResidenceChange); new } @@ -229,7 +215,6 @@ impl LayerAccessStats { /// pub(crate) fn record_residence_event( &self, - _layer_map_lock_held_witness: &LayerManager, status: LayerResidenceStatus, reason: LayerResidenceEventReason, ) { diff --git a/pageserver/src/tenant/storage_layer/filename.rs b/pageserver/src/tenant/storage_layer/filename.rs index 843bb1f631..922bb18bff 100644 --- a/pageserver/src/tenant/storage_layer/filename.rs +++ b/pageserver/src/tenant/storage_layer/filename.rs @@ -215,6 +215,17 @@ impl LayerFileName { pub fn file_name(&self) -> String { self.to_string() } + + /// Determines if this layer file is considered to be in future meaning we will discard these + /// layers during timeline initialization from the given disk_consistent_lsn. + pub(crate) fn is_in_future(&self, disk_consistent_lsn: Lsn) -> bool { + use LayerFileName::*; + match self { + Image(file_name) if file_name.lsn > disk_consistent_lsn => true, + Delta(file_name) if file_name.lsn_range.end > disk_consistent_lsn + 1 => true, + _ => false, + } + } } impl fmt::Display for LayerFileName { diff --git a/pageserver/src/tenant/storage_layer/remote_layer.rs b/pageserver/src/tenant/storage_layer/remote_layer.rs index 6ccdbb4fdf..3f8d700863 100644 --- a/pageserver/src/tenant/storage_layer/remote_layer.rs +++ b/pageserver/src/tenant/storage_layer/remote_layer.rs @@ -185,7 +185,7 @@ impl RemoteLayer { /// Create a Layer struct representing this layer, after it has been downloaded. pub(crate) fn create_downloaded_layer( &self, - layer_map_lock_held_witness: &LayerManager, + _layer_map_lock_held_witness: &LayerManager, conf: &'static PageServerConf, file_size: u64, ) -> Arc { @@ -197,10 +197,8 @@ impl RemoteLayer { self.desc.tenant_id, &fname, file_size, - self.access_stats.clone_for_residence_change( - layer_map_lock_held_witness, - LayerResidenceStatus::Resident, - ), + self.access_stats + .clone_for_residence_change(LayerResidenceStatus::Resident), )) } else { let fname = self.desc.image_file_name(); @@ -210,10 +208,8 @@ impl RemoteLayer { self.desc.tenant_id, &fname, file_size, - self.access_stats.clone_for_residence_change( - layer_map_lock_held_witness, - LayerResidenceStatus::Resident, - ), + self.access_stats + .clone_for_residence_change(LayerResidenceStatus::Resident), )) } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 4880293c33..6245c639dd 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1,5 +1,6 @@ pub mod delete; mod eviction_task; +mod init; pub mod layer_manager; mod logical_size; pub mod span; @@ -27,7 +28,6 @@ use utils::id::TenantTimelineId; use std::cmp::{max, min, Ordering}; use std::collections::{BinaryHeap, HashMap, HashSet}; -use std::fs; use std::ops::{Deref, Range}; use std::path::{Path, PathBuf}; use std::pin::pin; @@ -38,15 +38,13 @@ use std::time::{Duration, Instant, SystemTime}; use crate::context::{ AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder, }; -use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata}; +use crate::tenant::remote_timeline_client::index::LayerFileMetadata; use crate::tenant::storage_layer::delta_layer::DeltaEntry; use crate::tenant::storage_layer::{ - DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer, - LayerAccessStats, LayerFileName, RemoteLayer, + DeltaLayerWriter, ImageLayerWriter, InMemoryLayer, LayerAccessStats, LayerFileName, RemoteLayer, }; use crate::tenant::timeline::logical_size::CurrentLogicalSize; use crate::tenant::{ - ephemeral_file::is_ephemeral_file, layer_map::{LayerMap, SearchResult}, metadata::{save_metadata, TimelineMetadata}, par_fsync, @@ -78,11 +76,10 @@ use utils::{ use crate::page_cache; use crate::repository::GcResult; use crate::repository::{Key, Value}; +use crate::task_mgr; use crate::task_mgr::TaskKind; use crate::walredo::WalRedoManager; -use crate::METADATA_FILE_NAME; use crate::ZERO_PAGE; -use crate::{is_temporary, task_mgr}; use self::delete::DeleteTimelineFlow; pub(super) use self::eviction_task::EvictionTaskTenantState; @@ -1211,7 +1208,7 @@ impl Timeline { &layer_metadata, local_layer .access_stats() - .clone_for_residence_change(layer_mgr, LayerResidenceStatus::Evicted), + .clone_for_residence_change(LayerResidenceStatus::Evicted), ), LayerFileName::Delta(delta_name) => RemoteLayer::new_delta( self.tenant_id, @@ -1220,7 +1217,7 @@ impl Timeline { &layer_metadata, local_layer .access_stats() - .clone_for_residence_change(layer_mgr, LayerResidenceStatus::Evicted), + .clone_for_residence_change(LayerResidenceStatus::Evicted), ), }); @@ -1518,7 +1515,7 @@ impl Timeline { let layer_flush_start_rx = self.layer_flush_start_tx.subscribe(); let self_clone = Arc::clone(self); - info!("spawning flush loop"); + debug!("spawning flush loop"); *flush_loop_state = FlushLoopState::Running { #[cfg(test)] expect_initdb_optimization: false, @@ -1589,9 +1586,7 @@ impl Timeline { )); } - /// /// Initialize with an empty layer map. Used when creating a new timeline. - /// pub(super) fn init_empty_layer_map(&self, start_lsn: Lsn) { let mut layers = self.layers.try_write().expect( "in the context where we call this function, no other task has access to the object", @@ -1599,10 +1594,16 @@ impl Timeline { layers.initialize_empty(Lsn(start_lsn.0)); } - /// - /// Scan the timeline directory to populate the layer map. - /// - pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> { + /// Scan the timeline directory, cleanup, populate the layer map, and schedule uploads for local-only + /// files. + pub(super) async fn load_layer_map( + &self, + disk_consistent_lsn: Lsn, + index_part: Option, + ) -> anyhow::Result<()> { + use init::{Decision::*, Discovered, FutureLayer}; + use LayerFileName::*; + let mut guard = self.layers.write().await; let timer = self.metrics.load_layer_map_histo.start_timer(); @@ -1610,102 +1611,150 @@ impl Timeline { // Scan timeline directory and create ImageFileName and DeltaFilename // structs representing all files on disk let timeline_path = self.conf.timeline_path(&self.tenant_id, &self.timeline_id); - // total size of layer files in the current timeline directory - let mut total_physical_size = 0; + let (conf, tenant_id, timeline_id) = (self.conf, self.tenant_id, self.timeline_id); + let span = tracing::Span::current(); - let mut loaded_layers = Vec::>::new(); + let (loaded_layers, needs_upload, total_physical_size) = tokio::task::spawn_blocking({ + move || { + let _g = span.entered(); + let discovered = init::scan_timeline_dir(&timeline_path)?; + let mut discovered_layers = Vec::with_capacity(discovered.len()); + let mut unrecognized_files = Vec::new(); - for direntry in fs::read_dir(timeline_path)? { - let direntry = direntry?; - let direntry_path = direntry.path(); - let fname = direntry.file_name(); - let fname = fname.to_string_lossy(); + let mut path = timeline_path; - if let Some(filename) = ImageFileName::parse_str(&fname) { - // create an ImageLayer struct for each image file. - if filename.lsn > disk_consistent_lsn { - info!( - "found future image layer {} on timeline {} disk_consistent_lsn is {}", - filename, self.timeline_id, disk_consistent_lsn - ); - - rename_to_backup(&direntry_path)?; - continue; + for discovered in discovered { + let (name, kind) = match discovered { + Discovered::Layer(file_name, file_size) => { + discovered_layers.push((file_name, file_size)); + continue; + } + Discovered::Metadata | Discovered::IgnoredBackup => { + continue; + } + Discovered::Unknown(file_name) => { + // we will later error if there are any + unrecognized_files.push(file_name); + continue; + } + Discovered::Ephemeral(name) => (name, "old ephemeral file"), + Discovered::Temporary(name) => (name, "temporary timeline file"), + Discovered::TemporaryDownload(name) => (name, "temporary download"), + }; + path.push(name); + init::cleanup(&path, kind)?; + path.pop(); } - let file_size = direntry_path.metadata()?.len(); - let stats = - LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Resident); - - let layer = ImageLayer::new( - self.conf, - self.timeline_id, - self.tenant_id, - &filename, - file_size, - stats, - ); - - total_physical_size += file_size; - loaded_layers.push(Arc::new(layer)); - } else if let Some(filename) = DeltaFileName::parse_str(&fname) { - // Create a DeltaLayer struct for each delta file. - // The end-LSN is exclusive, while disk_consistent_lsn is - // inclusive. For example, if disk_consistent_lsn is 100, it is - // OK for a delta layer to have end LSN 101, but if the end LSN - // is 102, then it might not have been fully flushed to disk - // before crash. - if filename.lsn_range.end > disk_consistent_lsn + 1 { - info!( - "found future delta layer {} on timeline {} disk_consistent_lsn is {}", - filename, self.timeline_id, disk_consistent_lsn + if !unrecognized_files.is_empty() { + // assume that if there are any there are many many. + let n = unrecognized_files.len(); + let first = &unrecognized_files[..n.min(10)]; + anyhow::bail!( + "unrecognized files in timeline dir (total {n}), first 10: {first:?}" ); - - rename_to_backup(&direntry_path)?; - continue; } - let file_size = direntry_path.metadata()?.len(); - let stats = - LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Resident); + let decided = + init::reconcile(discovered_layers, index_part.as_ref(), disk_consistent_lsn); - let layer = DeltaLayer::new( - self.conf, - self.timeline_id, - self.tenant_id, - &filename, - file_size, - stats, - ); + let mut loaded_layers = Vec::new(); + let mut needs_upload = Vec::new(); + let mut total_physical_size = 0; - total_physical_size += file_size; - loaded_layers.push(Arc::new(layer)); - } else if fname == METADATA_FILE_NAME || fname.ends_with(".old") { - // ignore these - } else if remote_timeline_client::is_temp_download_file(&direntry_path) { - info!( - "skipping temp download file, reconcile_with_remote will resume / clean up: {}", - fname - ); - } else if is_ephemeral_file(&fname) { - // Delete any old ephemeral files - trace!("deleting old ephemeral file in timeline dir: {}", fname); - fs::remove_file(&direntry_path)?; - } else if is_temporary(&direntry_path) { - info!("removing temp timeline file at {}", direntry_path.display()); - fs::remove_file(&direntry_path).with_context(|| { - format!( - "failed to remove temp download file at {}", - direntry_path.display() - ) - })?; - } else { - warn!("unrecognized filename in timeline dir: {}", fname); + for (name, decision) in decided { + let decision = match decision { + Ok(UseRemote { local, remote }) => { + path.push(name.file_name()); + init::cleanup_local_file_for_remote(&path, &local, &remote)?; + path.pop(); + + UseRemote { local, remote } + } + Ok(decision) => decision, + Err(FutureLayer { local }) => { + if local.is_some() { + path.push(name.file_name()); + init::cleanup_future_layer(&path, name, disk_consistent_lsn)?; + path.pop(); + } else { + // we cannot do anything for remote layers, but not continuing to + // process it will leave it out index_part.json as well. + } + // + // we do not currently schedule deletions for these. + continue; + } + }; + + match &name { + Delta(d) => assert!(d.lsn_range.end <= disk_consistent_lsn + 1), + Image(i) => assert!(i.lsn <= disk_consistent_lsn), + } + + let status = match &decision { + UseLocal(_) | NeedsUpload(_) => LayerResidenceStatus::Resident, + Evicted(_) | UseRemote { .. } => LayerResidenceStatus::Evicted, + }; + + let stats = LayerAccessStats::for_loading_layer(status); + + let layer: Arc = match (name, &decision) { + (Delta(d), UseLocal(m) | NeedsUpload(m)) => { + total_physical_size += m.file_size(); + Arc::new(DeltaLayer::new( + conf, + timeline_id, + tenant_id, + &d, + m.file_size(), + stats, + )) + } + (Image(i), UseLocal(m) | NeedsUpload(m)) => { + total_physical_size += m.file_size(); + Arc::new(ImageLayer::new( + conf, + timeline_id, + tenant_id, + &i, + m.file_size(), + stats, + )) + } + (Delta(d), Evicted(remote) | UseRemote { remote, .. }) => Arc::new( + RemoteLayer::new_delta(tenant_id, timeline_id, &d, remote, stats), + ), + (Image(i), Evicted(remote) | UseRemote { remote, .. }) => Arc::new( + RemoteLayer::new_img(tenant_id, timeline_id, &i, remote, stats), + ), + }; + + if let NeedsUpload(m) = decision { + needs_upload.push((layer.clone(), m)); + } + + loaded_layers.push(layer); + } + Ok((loaded_layers, needs_upload, total_physical_size)) } - } + }) + .await + .map_err(anyhow::Error::new) + .and_then(|x| x)?; let num_layers = loaded_layers.len(); - guard.initialize_local_layers(loaded_layers, Lsn(disk_consistent_lsn.0) + 1); + + guard.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1); + + if let Some(rtc) = self.remote_client.as_ref() { + for (layer, m) in needs_upload { + rtc.schedule_layer_file_upload(&layer.layer_desc().filename(), &m)?; + } + rtc.schedule_index_upload_for_file_changes()?; + // Tenant::create_timeline will wait for these uploads to happen before returning, or + // on retry. + } info!( "loaded layer map with {} layers at {}, total physical size: {}", @@ -1716,236 +1765,6 @@ impl Timeline { .set(total_physical_size); timer.stop_and_record(); - - Ok(()) - } - - async fn create_remote_layers( - &self, - index_part: &IndexPart, - local_layers: HashMap>, - up_to_date_disk_consistent_lsn: Lsn, - ) -> anyhow::Result>> { - // Are we missing some files that are present in remote storage? - // Create RemoteLayer instances for them. - let mut local_only_layers = local_layers; - - // We're holding a layer map lock for a while but this - // method is only called during init so it's fine. - let mut guard = self.layers.write().await; - - let mut corrupted_local_layers = Vec::new(); - let mut added_remote_layers = Vec::new(); - for remote_layer_name in index_part.layer_metadata.keys() { - let local_layer = local_only_layers.remove(remote_layer_name); - - let remote_layer_metadata = index_part - .layer_metadata - .get(remote_layer_name) - .map(LayerFileMetadata::from) - .with_context(|| { - format!( - "No remote layer metadata found for layer {}", - remote_layer_name.file_name() - ) - })?; - - // Is the local layer's size different from the size stored in the - // remote index file? - // If so, rename_to_backup those files & replace their local layer with - // a RemoteLayer in the layer map so that we re-download them on-demand. - if let Some(local_layer) = local_layer { - let local_layer_path = local_layer - .local_path() - .expect("caller must ensure that local_layers only contains local layers"); - ensure!( - local_layer_path.exists(), - "every layer from local_layers must exist on disk: {}", - local_layer_path.display() - ); - - let remote_size = remote_layer_metadata.file_size(); - let metadata = local_layer_path.metadata().with_context(|| { - format!( - "get file size of local layer {}", - local_layer_path.display() - ) - })?; - let local_size = metadata.len(); - if local_size != remote_size { - warn!("removing local file {local_layer_path:?} because it has unexpected length {local_size}; length in remote index is {remote_size}"); - if let Err(err) = rename_to_backup(&local_layer_path) { - assert!(local_layer_path.exists(), "we would leave the local_layer without a file if this does not hold: {}", local_layer_path.display()); - anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}"); - } else { - self.metrics.resident_physical_size_gauge.sub(local_size); - corrupted_local_layers.push(local_layer); - // fall-through to adding the remote layer - } - } else { - debug!( - "layer is present locally and file size matches remote, using it: {}", - local_layer_path.display() - ); - continue; - } - } - - info!( - "remote layer does not exist locally, creating remote layer: {}", - remote_layer_name.file_name() - ); - - match remote_layer_name { - LayerFileName::Image(imgfilename) => { - if imgfilename.lsn > up_to_date_disk_consistent_lsn { - info!( - "found future image layer {} on timeline {} remote_consistent_lsn is {}", - imgfilename, self.timeline_id, up_to_date_disk_consistent_lsn - ); - continue; - } - let stats = - LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Evicted); - - let remote_layer = RemoteLayer::new_img( - self.tenant_id, - self.timeline_id, - imgfilename, - &remote_layer_metadata, - stats, - ); - let remote_layer = Arc::new(remote_layer); - added_remote_layers.push(remote_layer); - } - LayerFileName::Delta(deltafilename) => { - // Create a RemoteLayer for the delta file. - // The end-LSN is exclusive, while disk_consistent_lsn is - // inclusive. For example, if disk_consistent_lsn is 100, it is - // OK for a delta layer to have end LSN 101, but if the end LSN - // is 102, then it might not have been fully flushed to disk - // before crash. - if deltafilename.lsn_range.end > up_to_date_disk_consistent_lsn + 1 { - info!( - "found future delta layer {} on timeline {} remote_consistent_lsn is {}", - deltafilename, self.timeline_id, up_to_date_disk_consistent_lsn - ); - continue; - } - let stats = - LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Evicted); - - let remote_layer = RemoteLayer::new_delta( - self.tenant_id, - self.timeline_id, - deltafilename, - &remote_layer_metadata, - stats, - ); - let remote_layer = Arc::new(remote_layer); - added_remote_layers.push(remote_layer); - } - } - } - guard.initialize_remote_layers(corrupted_local_layers, added_remote_layers); - Ok(local_only_layers) - } - - /// This function will synchronize local state with what we have in remote storage. - /// - /// Steps taken: - /// 1. Initialize upload queue based on `index_part`. - /// 2. Create `RemoteLayer` instances for layers that exist only on the remote. - /// The list of layers on the remote comes from `index_part`. - /// The list of local layers is given by the layer map's `iter_historic_layers()`. - /// So, the layer map must have been loaded already. - /// 3. Schedule upload of local-only layer files (which will then also update the remote - /// IndexPart to include the new layer files). - /// - /// Refer to the [`remote_timeline_client`] module comment for more context. - /// - /// # TODO - /// May be a bit cleaner to do things based on populated remote client, - /// and then do things based on its upload_queue.latest_files. - #[instrument(skip(self, index_part, up_to_date_metadata))] - pub async fn reconcile_with_remote( - &self, - up_to_date_metadata: &TimelineMetadata, - index_part: Option<&IndexPart>, - ) -> anyhow::Result<()> { - info!("starting"); - let remote_client = self - .remote_client - .as_ref() - .ok_or_else(|| anyhow!("cannot download without remote storage"))?; - - let disk_consistent_lsn = up_to_date_metadata.disk_consistent_lsn(); - - let local_layers = { - let guard = self.layers.read().await; - let layers = guard.layer_map(); - layers - .iter_historic_layers() - .map(|l| (l.filename(), guard.get_from_desc(&l))) - .collect::>() - }; - - // If no writes happen, new branches do not have any layers, only the metadata file. - let has_local_layers = !local_layers.is_empty(); - let local_only_layers = match index_part { - Some(index_part) => { - info!( - "initializing upload queue from remote index with {} layer files", - index_part.layer_metadata.len() - ); - remote_client.init_upload_queue(index_part)?; - self.create_remote_layers(index_part, local_layers, disk_consistent_lsn) - .await? - } - None => { - info!("initializing upload queue as empty"); - remote_client.init_upload_queue_for_empty_remote(up_to_date_metadata)?; - local_layers - } - }; - - if has_local_layers { - // Are there local files that don't exist remotely? Schedule uploads for them. - // Local timeline metadata will get uploaded to remove along witht he layers. - for (layer_name, layer) in &local_only_layers { - // XXX solve this in the type system - let layer_path = layer - .local_path() - .expect("local_only_layers only contains local layers"); - let layer_size = layer_path - .metadata() - .with_context(|| format!("failed to get file {layer_path:?} metadata"))? - .len(); - info!("scheduling {layer_path:?} for upload"); - remote_client - .schedule_layer_file_upload(layer_name, &LayerFileMetadata::new(layer_size))?; - } - remote_client.schedule_index_upload_for_file_changes()?; - } else if index_part.is_none() { - // No data on the remote storage, no local layers, local metadata file. - // - // TODO https://github.com/neondatabase/neon/issues/3865 - // Currently, console does not wait for the timeline data upload to the remote storage - // and considers the timeline created, expecting other pageserver nodes to work with it. - // Branch metadata upload could get interrupted (e.g pageserver got killed), - // hence any locally existing branch metadata with no remote counterpart should be uploaded, - // otherwise any other pageserver won't see the branch on `attach`. - // - // After the issue gets implemented, pageserver should rather remove the branch, - // since absence on S3 means we did not acknowledge the branch creation and console will have to retry, - // no need to keep the old files. - remote_client.schedule_index_upload_for_metadata_update(up_to_date_metadata)?; - } else { - // Local timeline has a metadata file, remote one too, both have no layers to sync. - } - - info!("Done"); - Ok(()) } @@ -2852,7 +2671,6 @@ impl Timeline { if let Some(ref l) = delta_layer_to_add { // TODO: move access stats, metrics update, etc. into layer manager. l.access_stats().record_residence_event( - &guard, LayerResidenceStatus::Resident, LayerResidenceEventReason::LayerCreate, ); @@ -3241,7 +3059,6 @@ impl Timeline { .add(metadata.len()); let l = Arc::new(l); l.access_stats().record_residence_event( - &guard, LayerResidenceStatus::Resident, LayerResidenceEventReason::LayerCreate, ); @@ -3921,7 +3738,6 @@ impl Timeline { new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len())); l.access_stats().record_residence_event( - &guard, LayerResidenceStatus::Resident, LayerResidenceEventReason::LayerCreate, ); @@ -4840,7 +4656,8 @@ fn rename_to_backup(path: &Path) -> anyhow::Result<()> { for i in 0u32.. { new_path.set_file_name(format!("{filename}.{i}.old")); if !new_path.exists() { - std::fs::rename(path, &new_path)?; + std::fs::rename(path, &new_path) + .with_context(|| format!("rename {path:?} to {new_path:?}"))?; return Ok(()); } } diff --git a/pageserver/src/tenant/timeline/init.rs b/pageserver/src/tenant/timeline/init.rs new file mode 100644 index 0000000000..cdd4227b44 --- /dev/null +++ b/pageserver/src/tenant/timeline/init.rs @@ -0,0 +1,199 @@ +use crate::{ + is_temporary, + tenant::{ + ephemeral_file::is_ephemeral_file, + remote_timeline_client::{ + self, + index::{IndexPart, LayerFileMetadata}, + }, + storage_layer::LayerFileName, + }, + METADATA_FILE_NAME, +}; +use anyhow::Context; +use std::{collections::HashMap, ffi::OsString, path::Path, str::FromStr}; +use utils::lsn::Lsn; + +/// Identified files in the timeline directory. +pub(super) enum Discovered { + /// The only one we care about + Layer(LayerFileName, u64), + /// Old ephmeral files from previous launches, should be removed + Ephemeral(OsString), + /// Old temporary timeline files, unsure what these really are, should be removed + Temporary(OsString), + /// Temporary on-demand download files, should be removed + TemporaryDownload(OsString), + /// "metadata" file we persist locally and include in `index_part.json` + Metadata, + /// Backup file from previously future layers + IgnoredBackup, + /// Unrecognized, warn about these + Unknown(OsString), +} + +/// Scans the timeline directory for interesting files. +pub(super) fn scan_timeline_dir(path: &Path) -> anyhow::Result> { + let mut ret = Vec::new(); + + for direntry in std::fs::read_dir(path)? { + let direntry = direntry?; + let direntry_path = direntry.path(); + let file_name = direntry.file_name(); + + let fname = file_name.to_string_lossy(); + + let discovered = match LayerFileName::from_str(&fname) { + Ok(file_name) => { + let file_size = direntry.metadata()?.len(); + Discovered::Layer(file_name, file_size) + } + Err(_) => { + if fname == METADATA_FILE_NAME { + Discovered::Metadata + } else if fname.ends_with(".old") { + // ignore these + Discovered::IgnoredBackup + } else if remote_timeline_client::is_temp_download_file(&direntry_path) { + Discovered::TemporaryDownload(file_name) + } else if is_ephemeral_file(&fname) { + Discovered::Ephemeral(file_name) + } else if is_temporary(&direntry_path) { + Discovered::Temporary(file_name) + } else { + Discovered::Unknown(file_name) + } + } + }; + + ret.push(discovered); + } + + Ok(ret) +} + +/// Decision on what to do with a layer file after considering its local and remote metadata. +#[derive(Clone)] +pub(super) enum Decision { + /// The layer is not present locally. + Evicted(LayerFileMetadata), + /// The layer is present locally, but local metadata does not match remote; we must + /// delete it and treat it as evicted. + UseRemote { + local: LayerFileMetadata, + remote: LayerFileMetadata, + }, + /// The layer is present locally, and metadata matches. + UseLocal(LayerFileMetadata), + /// The layer is only known locally, it needs to be uploaded. + NeedsUpload(LayerFileMetadata), +} + +/// The related layer is is in future compared to disk_consistent_lsn, it must not be loaded. +#[derive(Debug)] +pub(super) struct FutureLayer { + /// The local metadata. `None` if the layer is only known through [`IndexPart`]. + pub(super) local: Option, +} + +/// Merges local discoveries and remote [`IndexPart`] to a collection of decisions. +/// +/// This function should not gain additional reasons to fail than [`FutureLayer`], consider adding +/// the checks earlier to [`scan_timeline_dir`]. +pub(super) fn reconcile( + discovered: Vec<(LayerFileName, u64)>, + index_part: Option<&IndexPart>, + disk_consistent_lsn: Lsn, +) -> Vec<(LayerFileName, Result)> { + use Decision::*; + + // name => (local, remote) + type Collected = HashMap, Option)>; + + let mut discovered = discovered + .into_iter() + .map(|(name, file_size)| (name, (Some(LayerFileMetadata::new(file_size)), None))) + .collect::(); + + // merge any index_part information, when available + index_part + .as_ref() + .map(|ip| ip.layer_metadata.iter()) + .into_iter() + .flatten() + .map(|(name, metadata)| (name, LayerFileMetadata::from(metadata))) + .for_each(|(name, metadata)| { + if let Some(existing) = discovered.get_mut(name) { + existing.1 = Some(metadata); + } else { + discovered.insert(name.to_owned(), (None, Some(metadata))); + } + }); + + discovered + .into_iter() + .map(|(name, (local, remote))| { + let decision = if name.is_in_future(disk_consistent_lsn) { + Err(FutureLayer { local }) + } else { + Ok(match (local, remote) { + (Some(local), Some(remote)) if local != remote => UseRemote { local, remote }, + (Some(x), Some(_)) => UseLocal(x), + (None, Some(x)) => Evicted(x), + (Some(x), None) => NeedsUpload(x), + (None, None) => { + unreachable!("there must not be any non-local non-remote files") + } + }) + }; + + (name, decision) + }) + .collect::>() +} + +pub(super) fn cleanup(path: &Path, kind: &str) -> anyhow::Result<()> { + let file_name = path.file_name().expect("must be file path"); + tracing::debug!(kind, ?file_name, "cleaning up"); + std::fs::remove_file(path) + .with_context(|| format!("failed to remove {kind} at {}", path.display())) +} + +pub(super) fn cleanup_local_file_for_remote( + path: &Path, + local: &LayerFileMetadata, + remote: &LayerFileMetadata, +) -> anyhow::Result<()> { + let local_size = local.file_size(); + let remote_size = remote.file_size(); + + let file_name = path.file_name().expect("must be file path"); + tracing::warn!("removing local file {file_name:?} because it has unexpected length {local_size}; length in remote index is {remote_size}"); + if let Err(err) = crate::tenant::timeline::rename_to_backup(path) { + assert!( + path.exists(), + "we would leave the local_layer without a file if this does not hold: {}", + path.display() + ); + Err(err) + } else { + Ok(()) + } +} + +pub(super) fn cleanup_future_layer( + path: &Path, + name: LayerFileName, + disk_consistent_lsn: Lsn, +) -> anyhow::Result<()> { + use LayerFileName::*; + let kind = match name { + Delta(_) => "delta", + Image(_) => "image", + }; + // future image layers are allowed to be produced always for not yet flushed to disk + // lsns stored in InMemoryLayer. + tracing::info!("found future {kind} layer {name} disk_consistent_lsn is {disk_consistent_lsn}"); + crate::tenant::timeline::rename_to_backup(path)?; + Ok(()) +} diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index 25df8a4f52..5522ea1788 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -13,7 +13,7 @@ use crate::{ layer_map::{BatchedUpdates, LayerMap}, storage_layer::{ AsLayerDesc, DeltaLayer, ImageLayer, InMemoryLayer, PersistentLayer, - PersistentLayerDesc, PersistentLayerKey, RemoteLayer, + PersistentLayerDesc, PersistentLayerKey, }, timeline::compare_arced_layers, }, @@ -85,21 +85,6 @@ impl LayerManager { self.layer_map.next_open_layer_at = Some(next_open_layer_at); } - pub(crate) fn initialize_remote_layers( - &mut self, - corrupted_local_layers: Vec>, - remote_layers: Vec>, - ) { - let mut updates = self.layer_map.batch_update(); - for layer in corrupted_local_layers { - Self::remove_historic_layer(layer, &mut updates, &mut self.layer_fmgr); - } - for layer in remote_layers { - Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr); - } - updates.flush(); - } - /// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer, /// called within `get_layer_for_write`. pub(crate) fn get_layer_for_write( @@ -265,16 +250,6 @@ impl LayerManager { mapping.insert(layer); } - /// Helper function to remove a layer into the layer map and file manager - fn remove_historic_layer( - layer: Arc, - updates: &mut BatchedUpdates<'_>, - mapping: &mut LayerFileManager, - ) { - updates.remove_historic(layer.layer_desc()); - mapping.remove(layer); - } - /// Removes the layer from local FS (if present) and from memory. /// Remote storage is not affected by this operation. fn delete_historic_layer( diff --git a/test_runner/regress/test_ondemand_download.py b/test_runner/regress/test_ondemand_download.py index 0640e65e57..a4e86e0519 100644 --- a/test_runner/regress/test_ondemand_download.py +++ b/test_runner/regress/test_ondemand_download.py @@ -369,7 +369,7 @@ def test_download_remote_layers_api( filled_current_physical = get_api_current_physical_size() log.info(filled_current_physical) filled_size = get_resident_physical_size() - log.info(filled_size) + log.info(f"filled_size: {filled_size}") assert filled_current_physical == filled_size, "we don't yet do layer eviction" env.pageserver.stop() @@ -377,7 +377,7 @@ def test_download_remote_layers_api( # remove all the layer files # XXX only delete some of the layer files, to show that it really just downloads all the layers for layer in (Path(env.repo_dir) / "tenants").glob("*/timelines/*/*-*_*"): - log.info(f"unlinking layer {layer}") + log.info(f"unlinking layer {layer.name}") layer.unlink() # Shut down safekeepers before starting the pageserver. @@ -403,7 +403,7 @@ def test_download_remote_layers_api( filled_current_physical == get_api_current_physical_size() ), "current_physical_size is sum of loaded layer sizes, independent of whether local or remote" post_unlink_size = get_resident_physical_size() - log.info(post_unlink_size) + log.info(f"post_unlink_size: {post_unlink_size}") assert ( post_unlink_size < filled_size ), "we just deleted layers and didn't cause anything to re-download them yet"