From 52a7e3155e3fd6132794f53623698e12e403f711 Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Wed, 4 May 2022 14:53:18 +0300 Subject: [PATCH] Add local path to the Layer trait and historic layers --- pageserver/src/http/routes.rs | 6 +- pageserver/src/layered_repository.rs | 122 ++++++++++++++---- .../src/layered_repository/delta_layer.rs | 4 + .../src/layered_repository/image_layer.rs | 4 + .../src/layered_repository/inmemory_layer.rs | 4 + .../src/layered_repository/layer_map.rs | 2 +- .../src/layered_repository/storage_layer.rs | 3 + pageserver/src/remote_storage.rs | 8 +- pageserver/src/remote_storage/storage_sync.rs | 18 ++- 9 files changed, 131 insertions(+), 40 deletions(-) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 5903dea372..f12e4c4051 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -11,7 +11,7 @@ use super::models::{ }; use crate::config::RemoteStorageKind; use crate::remote_storage::{ - download_index_part, schedule_timeline_download, LocalFs, RemoteIndex, RemoteTimeline, S3Bucket, + download_index_part, schedule_layer_download, LocalFs, RemoteIndex, RemoteTimeline, S3Bucket, }; use crate::repository::Repository; use crate::tenant_config::TenantConfOpt; @@ -273,7 +273,7 @@ async fn timeline_attach_handler(request: Request) -> Result) -> Result index_accessor.add_timeline_entry(sync_id, new_timeline), } - schedule_timeline_download(tenant_id, timeline_id); + schedule_layer_download(tenant_id, timeline_id); json_response(StatusCode::ACCEPTED, ()) } diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 69271467a6..6719c22738 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -20,8 +20,8 @@ use tracing::*; use std::cmp::{max, min, Ordering}; use std::collections::hash_map::Entry; -use std::collections::BTreeSet; use std::collections::HashMap; +use std::collections::{BTreeSet, HashSet}; use std::fs; use std::fs::{File, OpenOptions}; use std::io::Write; @@ -37,7 +37,7 @@ use crate::keyspace::KeySpace; use crate::tenant_config::{TenantConf, TenantConfOpt}; use crate::page_cache; -use crate::remote_storage::{schedule_timeline_checkpoint_upload, RemoteIndex}; +use crate::remote_storage::{self, RemoteIndex}; use crate::repository::{ GcResult, Repository, RepositoryTimeline, Timeline, TimelineSyncStatusUpdate, TimelineWriter, }; @@ -428,7 +428,7 @@ impl Repository for LayeredRepository { Entry::Occupied(_) => bail!("We completed a download for a timeline that already exists in repository. This is a bug."), Entry::Vacant(entry) => { // we need to get metadata of a timeline, another option is to pass it along with Downloaded status - let metadata = Self::load_metadata(self.conf, timeline_id, self.tenant_id).context("failed to load local metadata")?; + let metadata = load_metadata(self.conf, timeline_id, self.tenant_id).context("failed to load local metadata")?; // finally we make newly downloaded timeline visible to repository entry.insert(LayeredTimelineEntry::Unloaded { id: timeline_id, metadata, }) }, @@ -618,7 +618,7 @@ impl LayeredRepository { timelineid: ZTimelineId, timelines: &mut HashMap, ) -> anyhow::Result> { - let metadata = Self::load_metadata(self.conf, timelineid, self.tenant_id) + let metadata = load_metadata(self.conf, timelineid, self.tenant_id) .context("failed to load metadata")?; let disk_consistent_lsn = metadata.disk_consistent_lsn(); @@ -776,17 +776,6 @@ impl LayeredRepository { Ok(()) } - fn load_metadata( - conf: &'static PageServerConf, - timelineid: ZTimelineId, - tenantid: ZTenantId, - ) -> Result { - let path = metadata_path(conf, timelineid, tenantid); - info!("loading metadata from {}", path.display()); - let metadata_bytes = std::fs::read(&path)?; - TimelineMetadata::from_bytes(&metadata_bytes) - } - // // How garbage collection works: // @@ -1796,10 +1785,10 @@ impl LayeredTimeline { PERSISTENT_BYTES_WRITTEN.inc_by(new_delta_path.metadata()?.len()); if self.upload_layers.load(atomic::Ordering::Relaxed) { - schedule_timeline_checkpoint_upload( + remote_storage::schedule_layer_upload( self.tenantid, self.timelineid, - new_delta_path, + HashSet::from([new_delta_path]), metadata, ); } @@ -1860,11 +1849,23 @@ impl LayeredTimeline { let timer = self.create_images_time_histo.start_timer(); // 2. Create new image layers for partitions that have been modified // "enough". + let mut layer_paths_to_upload = HashSet::with_capacity(partitioning.parts.len()); for part in partitioning.parts.iter() { if self.time_for_new_image_layer(part, lsn)? { - self.create_image_layer(part, lsn)?; + let new_path = self.create_image_layer(part, lsn)?; + layer_paths_to_upload.insert(new_path); } } + if self.upload_layers.load(atomic::Ordering::Relaxed) { + let metadata = load_metadata(self.conf, self.timelineid, self.tenantid) + .context("failed to load local metadata")?; + remote_storage::schedule_layer_upload( + self.tenantid, + self.timelineid, + layer_paths_to_upload, + metadata, + ); + } timer.stop_and_record(); // 3. Compact @@ -1906,7 +1907,7 @@ impl LayeredTimeline { Ok(false) } - fn create_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> Result<()> { + fn create_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result { let img_range = partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end; let mut image_layer_writer = @@ -1939,10 +1940,11 @@ impl LayeredTimeline { // FIXME: Do we need to do something to upload it to remote storage here? let mut layers = self.layers.write().unwrap(); + let new_path = image_layer.path(); layers.insert_historic(Arc::new(image_layer)); drop(layers); - Ok(()) + Ok(new_path) } fn compact_level0(&self, target_file_size: u64) -> Result<()> { @@ -2037,18 +2039,43 @@ impl LayeredTimeline { } let mut layers = self.layers.write().unwrap(); + let mut new_layer_paths = HashSet::with_capacity(new_layers.len()); for l in new_layers { + new_layer_paths.insert(l.path()); layers.insert_historic(Arc::new(l)); } + if self.upload_layers.load(atomic::Ordering::Relaxed) { + let metadata = load_metadata(self.conf, self.timelineid, self.tenantid) + .context("failed to load local metadata")?; + remote_storage::schedule_layer_upload( + self.tenantid, + self.timelineid, + new_layer_paths, + metadata, + ); + } + // Now that we have reshuffled the data to set of new delta layers, we can // delete the old ones + let mut layer_paths_do_delete = HashSet::with_capacity(level0_deltas.len()); for l in level0_deltas { l.delete()?; - layers.remove_historic(l.clone()); + if let Some(path) = l.local_path() { + layer_paths_do_delete.insert(path); + } + layers.remove_historic(l); } drop(layers); + if self.upload_layers.load(atomic::Ordering::Relaxed) { + remote_storage::schedule_layer_delete( + self.tenantid, + self.timelineid, + layer_paths_do_delete, + ); + } + Ok(()) } @@ -2111,7 +2138,7 @@ impl LayeredTimeline { debug!("retain_lsns: {:?}", retain_lsns); - let mut layers_to_remove: Vec> = Vec::new(); + let mut layers_to_remove = Vec::new(); // Scan all on-disk layers in the timeline. // @@ -2222,13 +2249,24 @@ impl LayeredTimeline { // Actually delete the layers from disk and remove them from the map. // (couldn't do this in the loop above, because you cannot modify a collection // while iterating it. BTreeMap::retain() would be another option) + let mut layer_paths_to_delete = HashSet::with_capacity(layers_to_remove.len()); for doomed_layer in layers_to_remove { doomed_layer.delete()?; - layers.remove_historic(doomed_layer.clone()); - + if let Some(path) = doomed_layer.local_path() { + layer_paths_to_delete.insert(path); + } + layers.remove_historic(doomed_layer); result.layers_removed += 1; } + if self.upload_layers.load(atomic::Ordering::Relaxed) { + remote_storage::schedule_layer_delete( + self.tenantid, + self.timelineid, + layer_paths_to_delete, + ); + } + result.elapsed = now.elapsed()?; Ok(result) } @@ -2375,6 +2413,26 @@ fn rename_to_backup(path: PathBuf) -> anyhow::Result<()> { bail!("couldn't find an unused backup number for {:?}", path) } +fn load_metadata( + conf: &'static PageServerConf, + timeline_id: ZTimelineId, + tenant_id: ZTenantId, +) -> anyhow::Result { + let metadata_path = metadata_path(conf, timeline_id, tenant_id); + let metadata_bytes = std::fs::read(&metadata_path).with_context(|| { + format!( + "Failed to read metadata bytes from path {}", + metadata_path.display() + ) + })?; + TimelineMetadata::from_bytes(&metadata_bytes).with_context(|| { + format!( + "Failed to parse metadata bytes from path {}", + metadata_path.display() + ) + }) +} + /// /// Tests that are specific to the layered storage format. /// @@ -2409,9 +2467,19 @@ pub mod tests { let err = harness.try_load().err().expect("should fail"); assert_eq!(err.to_string(), "failed to load local metadata"); - assert_eq!( - err.source().unwrap().to_string(), - "metadata checksum mismatch" + + let mut found_error_message = false; + let mut err_source = err.source(); + while let Some(source) = err_source { + if source.to_string() == "metadata checksum mismatch" { + found_error_message = true; + break; + } + err_source = source.source(); + } + assert!( + found_error_message, + "didn't find the corrupted metadata error" ); Ok(()) diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index 1e1ec716a6..e78b05695c 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -218,6 +218,10 @@ impl Layer for DeltaLayer { PathBuf::from(self.layer_name().to_string()) } + fn local_path(&self) -> Option { + Some(self.path()) + } + fn get_value_reconstruct_data( &self, key: Key, diff --git a/pageserver/src/layered_repository/image_layer.rs b/pageserver/src/layered_repository/image_layer.rs index d7657ecac6..c0c8e7789a 100644 --- a/pageserver/src/layered_repository/image_layer.rs +++ b/pageserver/src/layered_repository/image_layer.rs @@ -125,6 +125,10 @@ impl Layer for ImageLayer { PathBuf::from(self.layer_name().to_string()) } + fn local_path(&self) -> Option { + Some(self.path()) + } + fn get_tenant_id(&self) -> ZTenantId { self.tenantid } diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index 856baa2e8a..bffb946f7e 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -85,6 +85,10 @@ impl Layer for InMemoryLayer { )) } + fn local_path(&self) -> Option { + None + } + fn get_tenant_id(&self) -> ZTenantId { self.tenantid } diff --git a/pageserver/src/layered_repository/layer_map.rs b/pageserver/src/layered_repository/layer_map.rs index 91a900dde0..7a2d0d5bcd 100644 --- a/pageserver/src/layered_repository/layer_map.rs +++ b/pageserver/src/layered_repository/layer_map.rs @@ -253,7 +253,7 @@ impl LayerMap { } } - pub fn iter_historic_layers(&self) -> std::slice::Iter> { + pub fn iter_historic_layers(&self) -> impl Iterator> { self.historic_layers.iter() } diff --git a/pageserver/src/layered_repository/storage_layer.rs b/pageserver/src/layered_repository/storage_layer.rs index aad631c5c4..9fcc8907d3 100644 --- a/pageserver/src/layered_repository/storage_layer.rs +++ b/pageserver/src/layered_repository/storage_layer.rs @@ -105,6 +105,9 @@ pub trait Layer: Send + Sync { /// log messages, even though they're never not on disk.) fn filename(&self) -> PathBuf; + /// If a layer has a corresponding file on a local filesystem, return its path. + fn local_path(&self) -> Option; + /// /// Return data needed to reconstruct given page at LSN. /// diff --git a/pageserver/src/remote_storage.rs b/pageserver/src/remote_storage.rs index cfa09dce14..4db0f6667d 100644 --- a/pageserver/src/remote_storage.rs +++ b/pageserver/src/remote_storage.rs @@ -14,7 +14,7 @@ //! //! * public API via to interact with the external world: //! * [`start_local_timeline_sync`] to launch a background async loop to handle the synchronization -//! * [`schedule_timeline_checkpoint_upload`] and [`schedule_timeline_download`] to enqueue a new upload and download tasks, +//! * [`schedule_layer_upload`], [`schedule_layer_download`] and [`schedule_layer_delete`] to enqueue a new upload and download tasks, //! to be processed by the async loop //! //! Here's a schematic overview of all interactions backup and the rest of the pageserver perform: @@ -71,10 +71,10 @@ //! when the newer image is downloaded //! //! Pageserver maintains similar to the local file structure remotely: all layer files are uploaded with the same names under the same directory structure. -//! Yet instead of keeping the `metadata` file remotely, we wrap it with more data in [`IndexShard`], containing the list of remote files. +//! Yet instead of keeping the `metadata` file remotely, we wrap it with more data in [`IndexPart`], containing the list of remote files. //! This file gets read to populate the cache, if the remote timeline data is missing from it and gets updated after every successful download. //! This way, we optimize S3 storage access by not running the `S3 list` command that could be expencive and slow: knowing both [`ZTenantId`] and [`ZTimelineId`], -//! we can always reconstruct the path to the timeline, use this to get the same path on the remote storage and retrive its shard contents, if needed, same as any layer files. +//! we can always reconstruct the path to the timeline, use this to get the same path on the remote storage and retrive its part contents, if needed, same as any layer files. //! //! By default, pageserver reads the remote storage index data only for timelines located locally, to synchronize those, if needed. //! Bulk index data download happens only initially, on pageserer startup. The rest of the remote storage stays unknown to pageserver and loaded on demand only, @@ -108,7 +108,7 @@ pub use self::{ storage_sync::{ download_index_part, index::{IndexPart, RemoteIndex, RemoteTimeline}, - schedule_timeline_checkpoint_upload, schedule_timeline_download, + schedule_layer_delete, schedule_layer_download, schedule_layer_upload, }, }; use crate::{ diff --git a/pageserver/src/remote_storage/storage_sync.rs b/pageserver/src/remote_storage/storage_sync.rs index 2d3416cd32..127655ce87 100644 --- a/pageserver/src/remote_storage/storage_sync.rs +++ b/pageserver/src/remote_storage/storage_sync.rs @@ -427,10 +427,10 @@ pub struct TimelineDownload { /// On task failure, it gets retried again from the start a number of times. /// /// Ensure that the loop is started otherwise the task is never processed. -pub fn schedule_timeline_checkpoint_upload( +pub fn schedule_layer_upload( tenant_id: ZTenantId, timeline_id: ZTimelineId, - new_layer: PathBuf, + layers_to_upload: HashSet, metadata: TimelineMetadata, ) { if !sync_queue::push( @@ -439,7 +439,7 @@ pub fn schedule_timeline_checkpoint_upload( timeline_id, }, SyncTask::upload(TimelineUpload { - layers_to_upload: HashSet::from([new_layer]), + layers_to_upload, uploaded_layers: HashSet::new(), metadata, }), @@ -450,6 +450,14 @@ pub fn schedule_timeline_checkpoint_upload( } } +pub fn schedule_layer_delete( + _tenant_id: ZTenantId, + _timeline_id: ZTimelineId, + _layers_to_delete: HashSet, +) { + // TODO kb implement later +} + /// Requests the download of the entire timeline for a given tenant. /// No existing local files are currently overwritten, except the metadata file (if its disk_consistent_lsn is less than the downloaded one). /// The metadata file is always updated last, to avoid inconsistencies. @@ -457,8 +465,8 @@ pub fn schedule_timeline_checkpoint_upload( /// On any failure, the task gets retried, omitting already downloaded layers. /// /// Ensure that the loop is started otherwise the task is never processed. -pub fn schedule_timeline_download(tenant_id: ZTenantId, timeline_id: ZTimelineId) { - debug!("Scheduling timeline download for tenant {tenant_id}, timeline {timeline_id}"); +pub fn schedule_layer_download(tenant_id: ZTenantId, timeline_id: ZTimelineId) { + debug!("Scheduling layer download for tenant {tenant_id}, timeline {timeline_id}"); sync_queue::push( ZTenantTimelineId { tenant_id,