From e61732ca7cd23702c90f2dab984c9902db2a251c Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Tue, 2 Nov 2021 23:36:52 +0200 Subject: [PATCH] Compress checkpoint files before streaming into S3 --- Cargo.lock | 56 + pageserver/Cargo.toml | 1 + pageserver/README.md | 2 +- pageserver/src/remote_storage.rs | 23 +- pageserver/src/remote_storage/README.md | 22 +- pageserver/src/remote_storage/local_fs.rs | 2 +- pageserver/src/remote_storage/storage_sync.rs | 1513 ++++++----------- .../storage_sync/compression.rs | 628 +++++++ .../src/remote_storage/storage_sync/index.rs | 375 ++++ pageserver/src/tenant_mgr.rs | 22 +- 10 files changed, 1618 insertions(+), 1026 deletions(-) create mode 100644 pageserver/src/remote_storage/storage_sync/compression.rs create mode 100644 pageserver/src/remote_storage/storage_sync/index.rs diff --git a/Cargo.lock b/Cargo.lock index a7b5b21ae9..0df9930b80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -41,6 +41,20 @@ version = "1.0.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61604a8f862e1d5c3229fdd78f8b02c68dcf73a4c4b05fd636d12240aaa242c1" +[[package]] +name = "async-compression" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5443ccbb270374a2b1055fc72da40e1f237809cd6bb0e97e66d264cd138473a6" +dependencies = [ + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "zstd", + "zstd-safe", +] + [[package]] name = "async-trait" version = "0.1.51" @@ -249,6 +263,9 @@ name = "cc" version = "1.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79c2681d6594606957bbb8631c4b90a7fcaaa72cdb714743a437b156d6a7eedd" +dependencies = [ + "jobserver", +] [[package]] name = "cexpr" @@ -845,6 +862,15 @@ version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" +[[package]] +name = "jobserver" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.55" @@ -1101,6 +1127,7 @@ name = "pageserver" version = "0.1.0" dependencies = [ "anyhow", + "async-compression", "async-trait", "bookfile", "byteorder", @@ -2510,3 +2537,32 @@ dependencies = [ "workspace_hack", "zenith_metrics", ] + +[[package]] +name = "zstd" +version = "0.7.0+zstd.1.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9428752481d8372e15b1bf779ea518a179ad6c771cca2d2c60e4fbff3cc2cd52" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "3.1.0+zstd.1.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aa1926623ad7fe406e090555387daf73db555b948134b4d73eac5eb08fb666d" +dependencies = [ + "libc", + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "1.5.0+zstd.1.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e6c094340240369025fc6b731b054ee2a834328fa584310ac96aa4baebdc465" +dependencies = [ + "cc", + "libc", +] diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index eb83b128d3..64f825ded5 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -41,6 +41,7 @@ nix = "0.23" once_cell = "1.8.0" rust-s3 = { version = "0.28", default-features = false, features = ["no-verify-ssl", "tokio-rustls-tls"] } +async-compression = {version = "0.3", features = ["zstd", "tokio"]} postgres_ffi = { path = "../postgres_ffi" } zenith_metrics = { path = "../zenith_metrics" } diff --git a/pageserver/README.md b/pageserver/README.md index b812a9dba9..7d1758b1dc 100644 --- a/pageserver/README.md +++ b/pageserver/README.md @@ -9,7 +9,7 @@ The Page Server has a few different duties: S3 is the main fault-tolerant storage of all data, as there are no Page Server replicas. We use a separate fault-tolerant WAL service to reduce latency. It -keeps track of WAL records which are not syncted to S3 yet. +keeps track of WAL records which are not synced to S3 yet. The Page Server consists of multiple threads that operate on a shared repository of page versions: diff --git a/pageserver/src/remote_storage.rs b/pageserver/src/remote_storage.rs index be7d421dbc..93504a30e6 100644 --- a/pageserver/src/remote_storage.rs +++ b/pageserver/src/remote_storage.rs @@ -20,7 +20,7 @@ //! | pageserver | (schedule checkpoint upload) | upload/download | //! | | | loop | //! | | <------------------------------- | | -//! | | (register downloaded layers) | | +//! | | (register downloaded timelines) | | //! +------------------------+ +---------<-------+ //! | //! | @@ -36,24 +36,27 @@ //! | access to this storage | //! +------------------------+ //! -//! First, during startup, the pageserver inits the storage sync thread with the async loop, or leaves the loop unitialised, if configured so. +//! First, during startup, the pageserver inits the storage sync thread with the async loop, or leaves the loop uninitialised, if configured so. //! Some time later, during pageserver checkpoints, in-memory data is flushed onto disk along with its metadata. -//! If the storage sync loop was successfully started before, pageserver schedules the new image uploads after every checkpoint. +//! If the storage sync loop was successfully started before, pageserver schedules the new checkpoint file uploads after every checkpoint. //! See [`crate::layered_repository`] for the upload calls and the adjacent logic. //! //! The storage logic considers `image` as a set of local files, fully representing a certain timeline at given moment (identified with `disk_consistent_lsn`). //! Timeline can change its state, by adding more files on disk and advancing its `disk_consistent_lsn`: this happens after pageserver checkpointing and is followed //! by the storage upload, if enabled. -//! When a certain image gets uploaded, the sync loop remembers the fact, preventing further reuploads of the same image state. +//! When a certain checkpoint gets uploaded, the sync loop remembers the fact, preventing further reuploads of the same state. //! No files are deleted from either local or remote storage, only the missing ones locally/remotely get downloaded/uploaded, local metadata file will be overwritten -//! when the newer timeline is downloaded. +//! when the newer image is downloaded. //! //! Meanwhile, the loop inits the storage connection and checks the remote files stored. //! This is done once at startup only, relying on the fact that pageserver uses the storage alone (ergo, nobody else uploads the files to the storage but this server). -//! Based on the remote image data, the storage sync logic queues image downloads, while accepting any potential upload tasks from pageserver and managing the tasks by their priority. -//! On the image download, a [`crate::tenant_mgr::register_relish_download`] function is called to register the new image in pageserver, initializing all related threads and internal state. +//! Based on the remote storage data, the sync logic queues timeline downloads, while accepting any potential upload tasks from pageserver and managing the tasks by their priority. +//! On the timeline download, a [`crate::tenant_mgr::register_timeline_download`] function is called to register the new timeline in pageserver, initializing all related threads and internal state. //! -//! When the pageserver terminates, the upload loop finishes a current image sync task (if any) and exits. +//! To optimize S3 storage (and access), the sync loop compresses the checkpoint files before placing them to S3, and uncompresses them back, keeping track of timeline files and metadata. +//! Also, the file remote file list is queried once only, at startup, to avoid possible extra costs and latency issues. +//! +//! When the pageserver terminates, the upload loop finishes a current sync task (if any) and exits. //! //! NOTES: //! * pageserver assumes it has exclusive write access to the remote storage. If supported, the way multiple pageservers can be separated in the same storage @@ -64,10 +67,6 @@ //! 2. pageserver loads the timeline from disk for the first time //! //! * the uploads do not happen right after the upload registration: the sync loop might be occupied with other tasks, or tasks with bigger priority could be waiting already -//! -//! * all synchronization tasks (including the public API to register uploads and downloads and the sync queue management) happens on an image scale: a big set of remote files, -//! enough to represent (and recover, if needed) a certain timeline state. On the contrary, all internal storage CRUD calls are made per reilsh file from those images. -//! This way, the synchronization is able to download the image partially, if some state was synced before, but exposes correctly synced images only. mod local_fs; mod rust_s3; diff --git a/pageserver/src/remote_storage/README.md b/pageserver/src/remote_storage/README.md index c8e4ad242a..21415a2d5e 100644 --- a/pageserver/src/remote_storage/README.md +++ b/pageserver/src/remote_storage/README.md @@ -18,6 +18,9 @@ Current implementation * provides remote storage wrappers for AWS S3 and local FS * uploads layers, frozen by pageserver checkpoint thread * downloads and registers layers, found on the remote storage, but missing locally +* uses compression when deals with files, for better S3 usage +* maintains an index of what's stored remotely +* evicts failing tasks and stops the corresponding timelines No good optimisations or performance testing is done, the feature is disabled by default and gets polished over time. It's planned to deal with all questions that are currently on and prepare the feature to be enabled by default in cloud environments. @@ -27,21 +30,16 @@ It's planned to deal with all questions that are currently on and prepare the fe As mentioned, the backup component is rather new and under development currently, so not all things are done properly from the start. Here's the list of known compromises with comments: -* Remote storage model is the same as the `tenants/` directory contents of the pageserver's local workdir storage. -This is relatively simple to implement, but may be costly to use in AWS S3: an initial data image contains ~782 relish files and a metadata file, ~31 MB combined. -AWS charges per API call and for traffic either, layers are expected to be updated frequently, so this model most probably is ineffective. -Additionally, pageservers might need to migrate images between tenants, which does not improve the situation. +* Remote storage file model is currently a custom archive format, that's not possible to deserialize without a particular Rust code of ours (including `serde`). +We also don't optimize the archivation and pack every timeline checkpoint separately, so the resulting blob's size that gets on S3 could be arbitrary. +But, it's a single blob, which is way better than storing ~780 small files separately. -Storage sync API operates images when backing up or restoring a backup, so we're fluent to repack the layer contents the way we want to, which most probably will be done later. +* Archive index restoration requires reading every blob's head. +This could be avoided by a background thread/future storing the serialized index in the remote storage. * no proper file comparison -Currently, every layer contains `Lsn` in their name, to map the data it holds against a certain DB state. -Then the images with same ids and different `Lsn`'s are compared, files are considered equal if their local file paths are equal (for remote files, "local file path" is their download destination). -No file contents assertion is done currently, but should be. -AWS S3 returns file checksums during the `list` operation, so that can be used to ensure the backup consistency, but that needs further research and, since current pageserver impl also needs to deal with layer file checksums. - -For now, due to this, we consider local workdir files as source of truth, not removing them ever and adjusting remote files instead, if image files mismatch. +No file checksum assertion is done currently, but should be (AWS S3 returns file checksums during the `list` operation) * sad rust-s3 api @@ -60,7 +58,7 @@ Based on previous evaluation, even `rusoto-s3` could be a better choice over thi So far, we don't consider non-main images and don't adjust the remote storage based on GC thread loop results. Only checkpointer loop affects the remote storage. -* more layers should be downloaded on demand +* more timelines should be downloaded on demand Since we download and load remote layers into pageserver, there's a possibility a need for those layers' ancestors arise. Most probably, every downloaded image's ancestor is not present in locally too, but currently there's no logic for downloading such ancestors and their metadata, diff --git a/pageserver/src/remote_storage/local_fs.rs b/pageserver/src/remote_storage/local_fs.rs index e9651988da..9308e9b008 100644 --- a/pageserver/src/remote_storage/local_fs.rs +++ b/pageserver/src/remote_storage/local_fs.rs @@ -355,7 +355,7 @@ mod pure_tests { .local_path( &storage_root.join(local_path.strip_prefix(&repo_harness.conf.workdir)?) ) - .expect("For a valid input, valid S3 info should be parsed"), + .expect("For a valid input, valid local path should be parsed"), "Should be able to parse metadata out of the correctly named remote delta file" ); diff --git a/pageserver/src/remote_storage/storage_sync.rs b/pageserver/src/remote_storage/storage_sync.rs index f10304fbd4..f5e06a17c4 100644 --- a/pageserver/src/remote_storage/storage_sync.rs +++ b/pageserver/src/remote_storage/storage_sync.rs @@ -2,10 +2,11 @@ //! //! The synchronization does not aim to be immediate, instead //! doing all the job in a separate thread asynchronously, attempting to fully replicate the -//! pageserver timeline workdir data on the remote storage. +//! pageserver timeline workdir data on the remote storage in a custom format, beneficial for storing. //! -//! [`SYNC_QUEUE`] is a priority queue to hold [`SyncTask`] for image upload/download. -//! The queue gets emptied by a single thread with the loop, that polls the tasks one by one. +//! [`SYNC_QUEUE`] is a deque to hold [`SyncTask`] for image upload/download. +//! The queue gets emptied by a single thread with the loop, that polls the tasks in batches (size configurable). +//! Every task in a batch processed concurrently, which is possible due to incremental nature of the timelines. //! //! During the loop startup, an initial loop state is constructed from all remote storage entries. //! It's enough to poll the remote state once on startup only, due to agreement that the pageserver has @@ -18,37 +19,28 @@ //! After the initial state is loaded into memory and the loop starts, any further [`Err`] results do not stop the loop, but rather //! reschedules the same task, with possibly less files to sync in it. //! -//! The synchronization unit is an image: a set of layer files (or relishes) and a special metadata file. -//! Both upload and download tasks consider image in a similar way ([`LocalTimeline`] and [`RemoteTimeline`]): -//! * a set of relishes (both upload and download tasks store the files as local pageserver paths, ergo [`PathBuf`] is used). -//! * a set of ids to distinguish the images ([`ZTenantId`] and [`ZTimelineId`]) -//! * `disk_consistent_lsn` which indicates the last [`Lsn`] applicable to the data stored in this image. +//! The synchronization unit is an archive: a set of timeline files (or relishes) and a special metadata file, all compressed into a blob. +//! An archive contains set of files of a certain timeline, added during checkpoint(s) and the timeline metadata at that moment. +//! The archive contains that metadata's `disk_consistent_lsn` in its name, to be able to restore partial index information from just a remote storage file list. +//! The index is created at startup (possible due to exclusive ownership over the remote storage by the pageserver) and keeps track of which files were stored +//! in what remote archives. +//! Among other tasks, the index is used to prevent invalid uploads and non-existing downloads on demand. +//! Refer to [`compression`] and [`index`] for more details on the archives and index respectively. //! -//! The same relish has identical layer paths in both structs, since both represent the relish path in pageserver's workdir. -//! This way, the sync can compare remote and local images seamlessly, downloading/uploading missing files if needed. +//! After pageserver parforms a succesful image checkpoint and produces new local files, it schedules an upload with +//! the list of the files and its metadata file contents at the moment of checkpointing. +//! Pageserver needs both the file list and metadata to load the timeline, so both are mandatory for the upload, that's why the uploads happen after checkpointing. +//! Not every upload of the same timeline gets processed: if `disk_consistent_lsn` is unchanged due to checkpointing for some reason, the remote data is not updated. //! -//! After pageserver parforms a succesful image checkpoint and detects that image state had updated, it reports an upload with -//! the list of image new files and its incremented `disk_consistent_lsn` (that also gets stored into image metadata file). -//! Both the file list and `disk_consistent_lsn` are mandatory for the upload, that's why the uploads happen after checkpointing. -//! Timelines with no such [`Lsn`] cannot guarantee their local file consistency and are not considered for backups. -//! Not every upload of the same timeline gets processed: if `disk_consistent_lsn` is unchanged, the remote timeline is not updated. +//! Current uploads are per-checkpoint and don't accumulate any data with optimal size for storing on S3. +//! The upload is atomic and gets rescheduled entirely, if fails along the way. +//! The downloads are per-timeline and download all missing timeline files. +//! The archives get processed sequentially, from smaller `disk_consistent_lsn` to larger, with metadata files being added as last. +//! If any of the archive processing fails along the way, all the remaining archives are rescheduled for the next attempt. +//! There's a reschedule threshold that evicts tasks that fail too much and stops the corresponding timeline so it does not diverge from the state on the remote storage. +//! The archive unpacking is designed to unpack metadata as the last file, so the risk of leaving the corrupt timeline due to uncompression error is small (while not eliminated entirely and that should be improved). //! -//! Remote timelines may lack `disk_consistent_lsn` if their metadata file is corrupt or missing. -//! Such timelines are not downloaded and their layer paths are entirely replaced with the ones from a newer upload for the same timeline. -//! Intact remote timelines are stored in the sync loop memory to avoid duplicate reuploads and then get queried for downloading, if no -//! timeline with the same id is found in the local workdir already. -//! -//! Besides all sync tasks operating images, internally every image is split to its underlying relish files which are synced independently. -//! The sync logic does not distinguish the relishes between each other, uploading/downloading them all via [`FuturesUnordered`] and registering all failures. -//! A single special exception is a metadata file, that is always uploaded/downloaded last (source images with no metadata are ignored), only after the rest -//! of the relishes are successfully synced. -//! If there are relish or metadata sync errors, the task gets resubmitted with all failed layers only, with all the successful layers stored in the loop state. -//! NOTE: No backpressure or eviction is implemented for tasks that always fail, it will be improved later. -//! -//! Synchronization never removes any local from pageserver workdir or remote files from the remote storage: the files from previous -//! uploads that are not mentioned in the new upload lists, are still considered as part of the corresponding image. -//! When determining which files to upload/download, the local file paths (for remote files, that is the same as their download destination) is compared, -//! and two files are considered "equal", if their paths match. Such files are uploaded/downloaded over, no real contents checks are done. +//! Synchronization never removes any local from pageserver workdir or remote files from the remote storage, yet there could be overwrites of the same files (metadata file updates; archive redownloads). //! NOTE: No real contents or checksum check happens right now and is a subject to improve later. //! //! After the whole timeline is downloaded, [`crate::tenant_mgr::register_timeline_download`] function is used to register the image in pageserver. @@ -56,13 +48,16 @@ //! When pageserver signals shutdown, current sync task gets finished and the loop exists. //! //! Currently there's no other way to download a remote relish if it was not downloaded after initial remote storage files check. -//! This is a subject to change in the near future, but requires more changes to [`crate::tenant_mgr`] before it can happen. +//! This is a subject to change in the near future. + +mod compression; +pub mod index; use std::{ - collections::{hash_map, HashMap, HashSet}, + collections::{BTreeSet, HashMap}, num::{NonZeroU32, NonZeroUsize}, - ops::DerefMut, - path::{Path, PathBuf}, + ops::{Deref, DerefMut}, + path::PathBuf, sync::Arc, thread, }; @@ -70,31 +65,26 @@ use std::{ use anyhow::{anyhow, ensure, Context}; use futures::stream::{FuturesUnordered, StreamExt}; use lazy_static::lazy_static; +use tokio::sync::RwLock; use tokio::{ - fs, - io::{self, AsyncWriteExt}, - sync::{ - mpsc::{self, UnboundedReceiver}, - Mutex, - }, + sync::mpsc::{self, UnboundedReceiver}, time::Instant, }; use tracing::*; +use self::{ + compression::ArchiveHeader, + index::{ArchiveId, RemoteTimeline}, +}; use super::{RemoteStorage, TimelineSyncId}; use crate::{ - layered_repository::{ - metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME}, - TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME, - }, - tenant_mgr::{perform_post_timeline_sync_steps, PostTimelineSyncStep}, + layered_repository::metadata::TimelineMetadata, + tenant_mgr::{perform_post_timeline_sync_steps, TimelineRegistration}, PageServerConf, }; + use zenith_metrics::{register_histogram_vec, register_int_gauge, HistogramVec, IntGauge}; -use zenith_utils::{ - lsn::Lsn, - zid::{ZTenantId, ZTimelineId}, -}; +use zenith_utils::zid::{ZTenantId, ZTimelineId}; lazy_static! { static ref REMAINING_SYNC_ITEMS: IntGauge = register_int_gauge!( @@ -224,8 +214,8 @@ impl SyncTask { #[derive(Debug, PartialEq, Eq, Clone)] enum SyncKind { - /// Regular image download, that is not critical for running, but still needed. - Download(RemoteTimeline), + /// A certain amount of images (archive files) to download. + Download(TimelineDownload), /// A checkpoint outcome with possible local file updates that need actualization in the remote storage. /// Not necessary more fresh than the one already uploaded. Upload(NewCheckpoint), @@ -242,23 +232,9 @@ struct NewCheckpoint { /// Info about the remote image files. #[derive(Clone, Debug, PartialEq, Eq)] -struct RemoteTimeline { - /// Same paths as in [`LocalTimeline`], pointing at the download - /// destination of every of the remote timeline layers. - layers: Vec, - /// If metadata file is uploaded, the corresponding field from this file. - /// On the contrast with [`LocalTimeline`], remote timeline's metadata may be missing - /// due to various upload errors or abrupt pageserver shutdowns that obstructed - /// the file storing. - metadata: Option, -} - -impl RemoteTimeline { - fn disk_consistent_lsn(&self) -> Option { - self.metadata - .as_ref() - .map(|meta| meta.disk_consistent_lsn()) - } +struct TimelineDownload { + files_to_skip: Arc>, + archives_to_download: Vec, } /// Adds the new checkpoint files as an upload sync task to the queue. @@ -337,19 +313,20 @@ fn storage_sync_loop< .enable_all() .build()?; let remote_timelines = runtime - .block_on(fetch_existing_uploads(&remote_storage)) + .block_on(index::reconstruct_from_storage(&remote_storage)) .context("Failed to determine previously uploaded timelines")?; schedule_first_tasks(config, &remote_timelines); - // placing the two variables, shared between the async loop tasks. Main reason for using `Arc` is `tokio::spawn` with its `'static` requirements. - let remote_timelines_and_storage = Arc::new((Mutex::new(remote_timelines), remote_storage)); - + // TODO kb return it back under a single Arc? + let remote_storage = Arc::new(remote_storage); + let remote_timelines = Arc::new(RwLock::new(remote_timelines)); while !crate::tenant_mgr::shutdown_requested() { let registration_steps = runtime.block_on(loop_step( config, &mut receiver, - Arc::clone(&remote_timelines_and_storage), + Arc::clone(&remote_storage), + Arc::clone(&remote_timelines), max_concurrent_sync, max_sync_errors, )); @@ -367,10 +344,11 @@ async fn loop_step< >( config: &'static PageServerConf, receiver: &mut UnboundedReceiver, - remote_timelines_and_storage: Arc<(Mutex>, S)>, + remote_storage: Arc, + remote_timelines: Arc>>, max_concurrent_sync: NonZeroUsize, max_sync_errors: NonZeroU32, -) -> HashMap<(ZTenantId, ZTimelineId), PostTimelineSyncStep> { +) -> HashMap<(ZTenantId, ZTimelineId), TimelineRegistration> { let max_concurrent_sync = max_concurrent_sync.get(); let mut next_tasks = Vec::with_capacity(max_concurrent_sync); @@ -389,7 +367,7 @@ async fn loop_step< let remaining_queue_length = sync_queue::len(); debug!( - "Processing {} tasks, more tasks left to process: {}", + "Processing {} tasks in batch, more tasks left to process: {}", next_tasks.len(), remaining_queue_length ); @@ -401,9 +379,10 @@ async fn loop_step< let sync_id = task.sync_id; let extra_step = match tokio::spawn(process_task( config, - Arc::clone(&remote_timelines_and_storage), + Arc::clone(&remote_storage), task, max_sync_errors, + Arc::clone(&remote_timelines), )) .await { @@ -435,24 +414,22 @@ async fn loop_step< extra_sync_steps } -type TaskSharedRemotes = (Mutex>, S); - async fn process_task< P: std::fmt::Debug + Send + Sync + 'static, S: RemoteStorage + Send + Sync + 'static, >( config: &'static PageServerConf, - remote_timelines_and_storage: Arc>, + remote_storage: Arc, task: SyncTask, max_sync_errors: NonZeroU32, -) -> Option { - let (remote_timelines, remote_storage) = remote_timelines_and_storage.as_ref(); + remote_timelines: Arc>>, +) -> Option { if task.retries > max_sync_errors.get() { error!( "Evicting task {:?} that failed {} times, exceeding the error theshold", task.kind, task.retries ); - return Some(PostTimelineSyncStep::Evict); + return Some(TimelineRegistration::Evict); } if task.retries > 0 { @@ -469,20 +446,21 @@ async fn process_task< SyncKind::Download(download_data) => { let sync_status = download_timeline( config, - remote_storage, + remote_timelines.read().await.deref(), + Arc::clone(&remote_storage), task.sync_id, download_data, task.retries + 1, ) .await; register_sync_status(sync_start, "download", sync_status); - Some(PostTimelineSyncStep::RegisterDownload) + Some(TimelineRegistration::Download) } SyncKind::Upload(layer_upload) => { - let sync_status = upload_timeline( + let sync_status = upload_timeline_checkpoint( config, - remote_timelines.lock().await.deref_mut(), - remote_storage, + remote_timelines.write().await.deref_mut(), + Arc::clone(&remote_storage), task.sync_id, layer_upload, task.retries + 1, @@ -498,31 +476,23 @@ fn schedule_first_tasks( config: &'static PageServerConf, remote_timelines: &HashMap, ) { - latest_timelines(remote_timelines) - .iter() - .filter(|sync_id| { - let exists_locally = config.timeline_path(&sync_id.1, &sync_id.0).exists(); - if exists_locally { - debug!( - "Timeline with tenant id {}, timeline id {} exists locally, not downloading", - sync_id.0, sync_id.1 - ); - false - } else { - true - } - }) - .filter_map(|&sync_id| { - let remote_timeline = remote_timelines.get(&sync_id)?; - Some(SyncTask::new( + for (&sync_id, timeline) in remote_timelines { + if !config.timeline_path(&sync_id.1, &sync_id.0).exists() { + sync_queue::push(SyncTask::new( sync_id, 0, - SyncKind::Download(remote_timeline.clone()), - )) - }) - .for_each(|task| { - sync_queue::push(task); - }); + SyncKind::Download(TimelineDownload { + files_to_skip: Arc::new(BTreeSet::new()), + archives_to_download: timeline.stored_archives(), + }), + )); + } else { + debug!( + "Timeline with tenant id {}, timeline id {} exists locally, not downloading", + sync_id.0, sync_id.1 + ); + } + } } fn register_sync_status(sync_start: Instant, sync_name: &str, sync_status: Option) { @@ -536,530 +506,213 @@ fn register_sync_status(sync_start: Instant, sync_name: &str, sync_status: Optio .observe(secs_elapsed) } -fn latest_timelines( +async fn download_timeline< + P: Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, +>( + config: &'static PageServerConf, remote_timelines: &HashMap, -) -> HashSet { - let mut latest_timelines_for_tenants = HashMap::with_capacity(remote_timelines.len()); - for (&sync_id, remote_timeline_data) in remote_timelines { - let (latest_timeline_id, timeline_metadata) = latest_timelines_for_tenants - .entry(sync_id.0) - .or_insert_with(|| (sync_id.1, remote_timeline_data.metadata.clone())); - if latest_timeline_id != &sync_id.1 - && timeline_metadata - .as_ref() - .map(|metadata| metadata.disk_consistent_lsn()) - < remote_timeline_data.disk_consistent_lsn() - { - *latest_timeline_id = sync_id.1; - *timeline_metadata = remote_timeline_data.metadata.clone(); - } - } - - latest_timelines_for_tenants - .into_iter() - .map(|(tenant_id, (timeline_id, _))| TimelineSyncId(tenant_id, timeline_id)) - .collect() -} - -async fn fetch_existing_uploads>( - remote_storage: &S, -) -> anyhow::Result> { - let uploaded_files = remote_storage - .list() - .await - .context("Failed to list the uploads")?; - - let mut data_fetches = uploaded_files - .into_iter() - .map(|remote_path| async { - let local_path = match remote_storage.local_path(&remote_path) { - Ok(path) => path, - Err(e) => return Err((e, remote_path)), - }; - let metadata = if local_path - .file_name() - .and_then(|os_str| os_str.to_str()) - .unwrap_or_default() - == METADATA_FILE_NAME - { - let mut metadata_bytes = Vec::new(); - if let Err(e) = remote_storage - .download(&remote_path, &mut metadata_bytes) - .await - { - return Err((e, remote_path)); - }; - let metadata = match TimelineMetadata::from_bytes(&metadata_bytes) { - Ok(metadata) => metadata, - Err(e) => return Err((e, remote_path)), - }; - Some(metadata) - } else { - None - }; - let sync_id = parse_sync_id(&local_path).map_err(|e| (e, remote_path))?; - Ok::<_, (anyhow::Error, P)>((local_path, sync_id, metadata)) - }) - .collect::>(); - - let mut fetched = HashMap::new(); - while let Some(fetch_result) = data_fetches.next().await { - match fetch_result { - Ok((local_path, sync_id, remote_metadata)) => { - let remote_timeline = fetched.entry(sync_id).or_insert_with(|| RemoteTimeline { - layers: Vec::new(), - metadata: None, - }); - if remote_metadata.is_some() { - remote_timeline.metadata = remote_metadata; - } else { - remote_timeline.layers.push(local_path); - } - } - Err((e, remote_path)) => { - warn!( - "Failed to fetch file info for path {:?}, reason: {:#}", - remote_path, e - ); - continue; - } - } - } - - Ok(fetched) -} - -fn parse_sync_id(path: &Path) -> anyhow::Result { - let mut segments = path - .iter() - .flat_map(|segment| segment.to_str()) - .skip_while(|&segment| segment != TENANTS_SEGMENT_NAME); - let tenants_segment = segments.next().ok_or_else(|| { - anyhow!( - "Found no '{}' segment in the storage path '{}'", - TENANTS_SEGMENT_NAME, - path.display() - ) - })?; - ensure!( - tenants_segment == TENANTS_SEGMENT_NAME, - "Failed to extract '{}' segment from storage path '{}'", - TENANTS_SEGMENT_NAME, - path.display() - ); - let tenant_id = segments - .next() - .ok_or_else(|| { - anyhow!( - "Found no tenant id in the storage path '{}'", - path.display() - ) - })? - .parse::() - .with_context(|| { - format!( - "Failed to parse tenant id from storage path '{}'", - path.display() - ) - })?; - - let timelines_segment = segments.next().ok_or_else(|| { - anyhow!( - "Found no '{}' segment in the storage path '{}'", - TIMELINES_SEGMENT_NAME, - path.display() - ) - })?; - ensure!( - timelines_segment == TIMELINES_SEGMENT_NAME, - "Failed to extract '{}' segment from storage path '{}'", - TIMELINES_SEGMENT_NAME, - path.display() - ); - let timeline_id = segments - .next() - .ok_or_else(|| { - anyhow!( - "Found no timeline id in the storage path '{}'", - path.display() - ) - })? - .parse::() - .with_context(|| { - format!( - "Failed to parse timeline id from storage path '{}'", - path.display() - ) - })?; - - Ok(TimelineSyncId(tenant_id, timeline_id)) -} - -async fn download_timeline>( - config: &'static PageServerConf, - remote_storage: &S, + remote_storage: Arc, sync_id: TimelineSyncId, - remote_timeline: RemoteTimeline, - current_retry: u32, + mut download: TimelineDownload, + retries: u32, ) -> Option { - debug!("Downloading layers for timeline {}", sync_id.1); + let TimelineSyncId(tenant_id, timeline_id) = sync_id; + debug!( + "Downloading layers for tenant {}, timeline {}", + tenant_id, timeline_id + ); - let new_metadata = if let Some(metadata) = remote_timeline.metadata { - metadata - } else { - warn!("Remote timeline incomplete: no metadata found, aborting the download"); - return None; - }; - debug!("Downloading {} layers", remote_timeline.layers.len()); + let remote_timeline = remote_timelines.get(&sync_id)?; - let sync_result = synchronize_layers( - config, - remote_storage, - remote_timeline.layers.into_iter(), - SyncOperation::Download, - &new_metadata, - sync_id, - ) - .await; - - match sync_result { - SyncResult::Success { .. } => Some(true), - SyncResult::MetadataSyncError { .. } => { - let download = RemoteTimeline { - layers: Vec::new(), - metadata: Some(new_metadata), - }; - sync_queue::push(SyncTask::new( - sync_id, - current_retry, - SyncKind::Download(download), - )); - Some(false) - } - SyncResult::LayerSyncError { not_synced, .. } => { - let download = RemoteTimeline { - layers: not_synced, - metadata: Some(new_metadata), - }; - sync_queue::push(SyncTask::new( - sync_id, - current_retry, - SyncKind::Download(download), - )); - Some(false) - } - } -} - -async fn upload_timeline<'a, P, S: 'static + RemoteStorage>( - config: &'static PageServerConf, - remote_timelines: &'a mut HashMap, - remote_storage: &'a S, - sync_id: TimelineSyncId, - mut new_upload: NewCheckpoint, - current_retry: u32, -) -> Option { - debug!("Uploading layers for timeline {}", sync_id.1); - - if let hash_map::Entry::Occupied(o) = remote_timelines.entry(sync_id) { - let uploaded_timeline_files = o.get(); - let uploaded_layers = uploaded_timeline_files - .layers - .iter() - .collect::>(); - new_upload - .layers - .retain(|path_to_upload| !uploaded_layers.contains(path_to_upload)); - match &uploaded_timeline_files.metadata { - None => debug!("Partially uploaded timeline found, downloading missing files only"), - Some(remote_metadata) => { - let new_lsn = new_upload.metadata.disk_consistent_lsn(); - let remote_lsn = remote_metadata.disk_consistent_lsn(); - if new_lsn <= remote_lsn { - warn!( - "Received a timeline witn LSN {} that's not later than the one from remote storage {}, not uploading", - new_lsn, remote_lsn - ); - return None; - } else { - debug!( - "Received a timeline with newer LSN {} (storage LSN {}), updating the upload", - new_lsn, remote_lsn - ) - } - } - } - } - - let NewCheckpoint { - layers: new_layers, - metadata: new_metadata, - .. - } = new_upload; - let sync_result = synchronize_layers( - config, - remote_storage, - new_layers.into_iter(), - SyncOperation::Upload, - &new_metadata, - sync_id, - ) - .await; - - let entry_to_update = remote_timelines - .entry(sync_id) - .or_insert_with(|| RemoteTimeline { - layers: Vec::new(), - metadata: Some(new_metadata.clone()), - }); - match sync_result { - SyncResult::Success { synced } => { - entry_to_update.layers.extend(synced.into_iter()); - entry_to_update.metadata = Some(new_metadata); - Some(true) - } - SyncResult::MetadataSyncError { synced } => { - entry_to_update.layers.extend(synced.into_iter()); - sync_queue::push(SyncTask::new( - sync_id, - current_retry, - SyncKind::Upload(NewCheckpoint { - layers: Vec::new(), - metadata: new_metadata, - }), - )); - Some(false) - } - SyncResult::LayerSyncError { synced, not_synced } => { - entry_to_update.layers.extend(synced.into_iter()); - sync_queue::push(SyncTask::new( - sync_id, - current_retry, - SyncKind::Upload(NewCheckpoint { - layers: not_synced, - metadata: new_metadata, - }), - )); - Some(false) - } - } -} - -/// Layer sync operation kind. -/// -/// This enum allows to unify the logic for image uploads and downloads. -/// When image's layers are synchronized, the only difference -/// between downloads and uploads is the [`RemoteStorage`] method we need to call. -#[derive(Debug, Copy, Clone)] -enum SyncOperation { - Download, - Upload, -} - -/// Image sync result. -#[derive(Debug)] -enum SyncResult { - /// All regular files are synced (their paths returned). - /// Metadata file is synced too (path not returned). - Success { synced: Vec }, - /// All regular files are synced (their paths returned). - /// Metadata file is not synced (path not returned). - MetadataSyncError { synced: Vec }, - /// Some regular files are not synced, some are (paths returned). - /// Metadata file is not synced (path not returned). - LayerSyncError { - synced: Vec, - not_synced: Vec, - }, -} - -/// Synchronizes given layers and metadata contents of a certain image. -/// Regular files are always synced before metadata files are, the latter gets synced only if -/// the rest of the files are successfully processed. -#[allow(clippy::too_many_arguments)] -async fn synchronize_layers<'a, P, S: 'static + RemoteStorage>( - config: &'static PageServerConf, - remote_storage: &'a S, - layers: impl Iterator, - sync_operation: SyncOperation, - new_metadata: &'a TimelineMetadata, - sync_id: TimelineSyncId, -) -> SyncResult { - let mut sync_operations = layers - .into_iter() - .map(|layer_path| async move { - let sync_result = match sync_operation { - SyncOperation::Download => download(remote_storage, &layer_path).await, - SyncOperation::Upload => upload(remote_storage, &layer_path).await, - }; - (layer_path, sync_result) - }) - .collect::>(); - - let mut synced = Vec::new(); - let mut not_synced = Vec::new(); - while let Some((layer_path, layer_download_result)) = sync_operations.next().await { - match layer_download_result { - Ok(()) => synced.push(layer_path), - Err(e) => { - error!( - "Failed to sync ({:?}) layer with local path '{}', reason: {:#}", - sync_operation, - layer_path.display(), - e, - ); - not_synced.push(layer_path); - } - } - } - - if not_synced.is_empty() { - debug!( - "Successfully synced ({:?}) all {} layers", - sync_operation, - synced.len(), - ); - trace!("Synced layers: {:?}", synced); - match sync_metadata( - config, - remote_storage, - sync_operation, - new_metadata, - sync_id, + let archives_total = download.archives_to_download.len(); + debug!("Downloading {} archives of a timeline", archives_total); + while let Some(archive_id) = download.archives_to_download.pop() { + if let Err(e) = try_download_archive( + Arc::clone(&remote_storage), + config.timeline_path(&timeline_id, &tenant_id), + remote_timeline, + archive_id, + Arc::clone(&download.files_to_skip), ) .await { - Ok(()) => { - debug!("Metadata file synced successfully"); - SyncResult::Success { synced } - } - Err(e) => { - error!( - "Failed to sync ({:?}) new metadata, reason: {:#}", - sync_operation, e - ); - SyncResult::MetadataSyncError { synced } - } + // add the failed archive back + download.archives_to_download.push(archive_id); + let archives_left = download.archives_to_download.len(); + error!( + "Failed to download archive {:?} for tenant {} timeline {} : {:#}, requeueing the download ({} archives left out of {})", + archive_id, tenant_id, timeline_id, e, archives_left, archives_total + ); + sync_queue::push(SyncTask::new( + sync_id, + retries, + SyncKind::Download(download), + )); + return Some(false); } - } else { - SyncResult::LayerSyncError { synced, not_synced } } + debug!("Finished downloading all timeline's archives"); + Some(true) } -async fn sync_metadata<'a, P, S: 'static + RemoteStorage>( - config: &'static PageServerConf, - remote_storage: &'a S, - sync_operation: SyncOperation, - new_metadata: &'a TimelineMetadata, - sync_id: TimelineSyncId, +async fn try_download_archive< + P: Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, +>( + remote_storage: Arc, + timeline_dir: PathBuf, + remote_timeline: &RemoteTimeline, + archive_id: ArchiveId, + files_to_skip: Arc>, ) -> anyhow::Result<()> { - debug!("Synchronizing ({:?}) metadata file", sync_operation); + debug!("Downloading archive {:?}", archive_id); + let remote_archive = remote_timeline + .archive_data(archive_id) + .ok_or_else(|| anyhow!("Archive {:?} not found in remote storage", archive_id))?; + let (archive_header, header_size) = remote_timeline + .restore_header(archive_id) + .context("Failed to restore header when downloading an archive")?; - let local_metadata_path = metadata_path(config, sync_id.1, sync_id.0); - let new_metadata_bytes = new_metadata.to_bytes()?; - match sync_operation { - SyncOperation::Download => { - tokio::fs::write(&local_metadata_path, new_metadata_bytes).await?; - tokio::fs::File::open( - local_metadata_path - .parent() - .expect("Metadata should always have a parent"), - ) - .await? - .sync_all() - .await?; - } - SyncOperation::Upload => { - let remote_path = remote_storage - .storage_path(&local_metadata_path) - .with_context(|| { - format!( - "Failed to get remote storage path for local metadata path '{}'", - local_metadata_path.display() - ) - })?; + compression::uncompress_file_stream_with_index( + timeline_dir.clone(), + files_to_skip, + remote_archive.disk_consistent_lsn(), + archive_header, + header_size, + move |mut archive_target, archive_name| async move { + let archive_local_path = timeline_dir.join(&archive_name); remote_storage - .upload( - io::BufReader::new(std::io::Cursor::new(new_metadata_bytes)), - &remote_path, + .download_range( + &remote_storage.storage_path(&archive_local_path)?, + header_size, + None, + &mut archive_target, ) - .await?; - } - } + .await + }, + ) + .await?; + Ok(()) } -async fn upload>( - remote_storage: &S, - source: &Path, -) -> anyhow::Result<()> { - let destination = remote_storage.storage_path(source).with_context(|| { - format!( - "Failed to derive storage destination out of upload path {}", - source.display() - ) - })?; - let source_file = io::BufReader::new( - tokio::fs::OpenOptions::new() - .read(true) - .open(source) - .await - .with_context(|| { - format!( - "Failed to open target s3 destination at {}", - source.display() - ) - })?, +async fn upload_timeline_checkpoint< + P: Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, +>( + config: &'static PageServerConf, + remote_timelines: &mut HashMap, + remote_storage: Arc, + sync_id: TimelineSyncId, + new_checkpoint: NewCheckpoint, + retries: u32, +) -> Option { + let TimelineSyncId(tenant_id, timeline_id) = sync_id; + debug!( + "Uploading checkpoint for tenant {}, timeline {}", + tenant_id, timeline_id ); - remote_storage.upload(source_file, &destination).await + let remote_timeline = remote_timelines.get(&sync_id); + + let new_upload_lsn = new_checkpoint.metadata.disk_consistent_lsn(); + let already_contains_upload_lsn = remote_timeline + .map(|remote_timeline| remote_timeline.contains_archive(new_upload_lsn)) + .unwrap_or(false); + if already_contains_upload_lsn { + warn!( + "Received a checkpoint witn Lsn {} that's already been uploaded to remote storage, skipping the upload.", + new_upload_lsn + ); + return None; + } + + let timeline_dir = config.timeline_path(&timeline_id, &tenant_id); + let already_uploaded_files = remote_timeline + .map(|timeline| timeline.stored_files(&timeline_dir)) + .unwrap_or_default(); + match try_upload_checkpoint( + config, + remote_storage, + sync_id, + &new_checkpoint, + already_uploaded_files, + ) + .await + { + Ok((archive_header, header_size)) => { + remote_timelines + .entry(sync_id) + .or_insert_with(RemoteTimeline::empty) + .set_archive_contents( + new_checkpoint.metadata.disk_consistent_lsn(), + archive_header, + header_size, + ); + debug!("Checkpoint uploaded successfully"); + Some(true) + } + Err(e) => { + error!( + "Failed to upload checkpoint: {:#}, requeueing the upload", + e + ); + sync_queue::push(SyncTask::new( + sync_id, + retries, + SyncKind::Upload(new_checkpoint), + )); + Some(false) + } + } } -async fn download>( - remote_storage: &S, - destination: &Path, -) -> anyhow::Result<()> { - if destination.exists() { - Ok(()) - } else { - let source = remote_storage.storage_path(destination).with_context(|| { - format!( - "Failed to derive storage source out of download destination '{}'", - destination.display() - ) - })?; +async fn try_upload_checkpoint< + P: Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, +>( + config: &'static PageServerConf, + remote_storage: Arc, + sync_id: TimelineSyncId, + new_checkpoint: &NewCheckpoint, + files_to_skip: BTreeSet, +) -> anyhow::Result<(ArchiveHeader, u64)> { + let TimelineSyncId(tenant_id, timeline_id) = sync_id; + let timeline_dir = config.timeline_path(&timeline_id, &tenant_id); - if let Some(target_parent) = destination.parent() { - if !target_parent.exists() { - tokio::fs::create_dir_all(target_parent) - .await - .with_context(|| { - format!( - "Failed to create parent directories for destination '{}'", - destination.display() - ) - })?; + let files_to_upload = new_checkpoint + .layers + .iter() + .filter(|&path_to_upload| { + if files_to_skip.contains(path_to_upload) { + error!( + "Skipping file upload '{}', since it was already uploaded", + path_to_upload.display() + ); + false + } else { + true } - } - let mut destination_file = io::BufWriter::new( - fs::OpenOptions::new() - .write(true) - .create_new(true) - .open(destination) - .await - .with_context(|| { - format!( - "Failed to open download destination file '{}'", - destination.display() - ) - })?, - ); + }) + .collect::>(); + ensure!(!files_to_upload.is_empty(), "No files to upload"); - remote_storage - .download(&source, &mut destination_file) - .await?; - destination_file.flush().await?; - Ok(()) - } + compression::archive_files_as_stream( + &timeline_dir, + files_to_upload.into_iter(), + &new_checkpoint.metadata, + move |archive_streamer, archive_name| async move { + let timeline_dir = config.timeline_path(&timeline_id, &tenant_id); + remote_storage + .upload( + archive_streamer, + &remote_storage.storage_path(&timeline_dir.join(&archive_name))?, + ) + .await + }, + ) + .await + .map(|(header, header_size, _)| (header, header_size)) } #[cfg(test)] @@ -1067,114 +720,82 @@ mod tests { use std::{ collections::{BTreeMap, BTreeSet}, fs, - io::Cursor, }; - use super::*; + use super::{index::reconstruct_from_storage, *}; use crate::{ + layered_repository::metadata::metadata_path, remote_storage::local_fs::LocalFs, repository::repo_harness::{RepoHarness, TIMELINE_ID}, }; - use hex_literal::hex; use tempfile::tempdir; - use tokio::io::BufReader; - - const NO_METADATA_TIMELINE_ID: ZTimelineId = - ZTimelineId::from_array(hex!("3755461d2259a63a80635d760958efd0")); - const CORRUPT_METADATA_TIMELINE_ID: ZTimelineId = - ZTimelineId::from_array(hex!("314db9af91fbc02dda586880a3216c61")); - - #[tokio::test] - async fn upload_new_timeline() -> anyhow::Result<()> { - let repo_harness = RepoHarness::create("upload_new_timeline")?; - let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?; - let mut remote_timelines = HashMap::new(); - - assert_timelines_equal( - HashMap::new(), - fetch_existing_uploads(&storage).await.unwrap(), - ); - - let upload_metadata = dummy_metadata(Lsn(0x30)); - let upload = create_local_timeline( - &repo_harness, - TIMELINE_ID, - &["a", "b"], - upload_metadata.clone(), - )?; - let expected_layers = upload.layers.clone(); - ensure_correct_timeline_upload( - &repo_harness, - &mut remote_timelines, - &storage, - TIMELINE_ID, - upload, - ) - .await; - - let mut expected_uploads = HashMap::new(); - expected_uploads.insert( - TimelineSyncId(repo_harness.tenant_id, TIMELINE_ID), - RemoteTimeline { - layers: expected_layers, - metadata: Some(upload_metadata), - }, - ); - assert_timelines_equal(expected_uploads, fetch_existing_uploads(&storage).await?); - - Ok(()) - } + use zenith_utils::lsn::Lsn; #[tokio::test] async fn reupload_timeline() -> anyhow::Result<()> { let repo_harness = RepoHarness::create("reupload_timeline")?; let sync_id = TimelineSyncId(repo_harness.tenant_id, TIMELINE_ID); - let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?; + let storage = Arc::new(LocalFs::new( + tempdir()?.path().to_owned(), + &repo_harness.conf.workdir, + )?); let mut remote_timelines = HashMap::new(); - let first_upload_metadata = dummy_metadata(Lsn(0x30)); - let first_timeline = create_local_timeline( + let first_upload_metadata = dummy_metadata(Lsn(0x10)); + let first_checkpoint = create_local_timeline( &repo_harness, TIMELINE_ID, &["a", "b"], first_upload_metadata.clone(), )?; - let first_paths = first_timeline.layers.clone(); + let local_timeline_path = repo_harness.timeline_path(&TIMELINE_ID); ensure_correct_timeline_upload( &repo_harness, &mut remote_timelines, - &storage, + Arc::clone(&storage), TIMELINE_ID, - first_timeline, + first_checkpoint, ) .await; - let after_first_uploads = remote_timelines.clone(); - let new_upload_metadata = dummy_metadata(Lsn(0x20)); - assert!( - new_upload_metadata.disk_consistent_lsn() < first_upload_metadata.disk_consistent_lsn() + let uploaded_timeline = remote_timelines + .get(&sync_id) + .expect("Should have the timeline after the corresponding checkpoint upload"); + let uploaded_archives = uploaded_timeline.stored_archives(); + assert_eq!( + uploaded_archives.len(), + 1, + "Only one archive is expected after a first upload" + ); + let first_uploaded_archive = uploaded_archives.first().copied().unwrap(); + assert_eq!( + uploaded_timeline.latest_disk_consistent_lsn(), + Some(first_upload_metadata.disk_consistent_lsn()), + "Metadata that was uploaded, should have its Lsn stored" + ); + assert_eq!( + uploaded_timeline + .archive_data(uploaded_archives.first().copied().unwrap()) + .unwrap() + .disk_consistent_lsn(), + first_upload_metadata.disk_consistent_lsn(), + "Uploaded archive should have corresponding Lsn" + ); + assert_eq!( + uploaded_timeline.stored_files(&local_timeline_path), + vec![local_timeline_path.join("a"), local_timeline_path.join("b")] + .into_iter() + .collect(), + "Should have all files from the first checkpoint" ); - let new_upload = - create_local_timeline(&repo_harness, TIMELINE_ID, &["b", "c"], new_upload_metadata)?; - upload_timeline( - repo_harness.conf, - &mut remote_timelines, - &storage, - sync_id, - new_upload.clone(), - 0, - ) - .await; - assert_timelines_equal(after_first_uploads, remote_timelines.clone()); let second_upload_metadata = dummy_metadata(Lsn(0x40)); - let second_timeline = create_local_timeline( + let second_checkpoint = create_local_timeline( &repo_harness, TIMELINE_ID, &["b", "c"], second_upload_metadata.clone(), )?; - let second_paths = second_timeline.layers.clone(); assert!( first_upload_metadata.disk_consistent_lsn() < second_upload_metadata.disk_consistent_lsn() @@ -1182,133 +803,190 @@ mod tests { ensure_correct_timeline_upload( &repo_harness, &mut remote_timelines, - &storage, + Arc::clone(&storage), TIMELINE_ID, - second_timeline, + second_checkpoint, ) .await; - let mut expected_uploads = HashMap::new(); - let mut expected_layers = first_paths.clone(); - expected_layers.extend(second_paths.clone().into_iter()); - expected_layers.dedup(); - - expected_uploads.insert( - TimelineSyncId(repo_harness.tenant_id, TIMELINE_ID), - RemoteTimeline { - layers: expected_layers, - metadata: Some(second_upload_metadata.clone()), - }, + let updated_timeline = remote_timelines + .get(&sync_id) + .expect("Should have the timeline after 2 checkpoints are uploaded"); + let mut updated_archives = updated_timeline.stored_archives(); + assert_eq!( + updated_archives.len(), + 2, + "Two archives are expected after a successful update of the upload" ); - assert_timelines_equal(expected_uploads, remote_timelines.clone()); - - let third_upload_metadata = dummy_metadata(Lsn(0x50)); - assert!( - second_upload_metadata.disk_consistent_lsn() - < third_upload_metadata.disk_consistent_lsn() + updated_archives.retain(|archive_id| archive_id != &first_uploaded_archive); + assert_eq!( + updated_archives.len(), + 1, + "Only one new archive is expected among the uploaded" ); - let third_timeline = create_local_timeline( + let second_uploaded_archive = updated_archives.last().copied().unwrap(); + assert_eq!( + updated_timeline.latest_disk_consistent_lsn(), + Some(second_upload_metadata.disk_consistent_lsn()), + "Metadata that was uploaded, should have its Lsn stored" + ); + assert_eq!( + updated_timeline + .archive_data(second_uploaded_archive) + .unwrap() + .disk_consistent_lsn(), + second_upload_metadata.disk_consistent_lsn(), + "Uploaded archive should have corresponding Lsn" + ); + assert_eq!( + updated_timeline.stored_files(&local_timeline_path), + vec![ + local_timeline_path.join("a"), + local_timeline_path.join("b"), + local_timeline_path.join("c"), + ] + .into_iter() + .collect(), + "Should have all files from both checkpoints without duplicates" + ); + + let third_upload_metadata = dummy_metadata(Lsn(0x20)); + let third_checkpoint = create_local_timeline( &repo_harness, TIMELINE_ID, - &["d", "e"], + &["d"], third_upload_metadata.clone(), )?; - let third_paths = third_timeline.layers.clone(); + assert_ne!( + third_upload_metadata.disk_consistent_lsn(), + first_upload_metadata.disk_consistent_lsn() + ); + assert!( + third_upload_metadata.disk_consistent_lsn() + < second_upload_metadata.disk_consistent_lsn() + ); ensure_correct_timeline_upload( &repo_harness, &mut remote_timelines, - &storage, + Arc::clone(&storage), TIMELINE_ID, - third_timeline, + third_checkpoint, ) .await; - let mut expected_uploads = HashMap::new(); - let mut expected_layers = first_paths; - expected_layers.extend(second_paths.into_iter()); - expected_layers.extend(third_paths.into_iter()); - expected_layers.dedup(); - - expected_uploads.insert( - TimelineSyncId(repo_harness.tenant_id, TIMELINE_ID), - RemoteTimeline { - layers: expected_layers, - metadata: Some(third_upload_metadata), - }, + let updated_timeline = remote_timelines + .get(&sync_id) + .expect("Should have the timeline after 3 checkpoints are uploaded"); + let mut updated_archives = updated_timeline.stored_archives(); + assert_eq!( + updated_archives.len(), + 3, + "Three archives are expected after two successful updates of the upload" + ); + updated_archives.retain(|archive_id| { + archive_id != &first_uploaded_archive && archive_id != &second_uploaded_archive + }); + assert_eq!( + updated_archives.len(), + 1, + "Only one new archive is expected among the uploaded" + ); + let third_uploaded_archive = updated_archives.last().copied().unwrap(); + assert!( + updated_timeline.latest_disk_consistent_lsn().unwrap() + > third_upload_metadata.disk_consistent_lsn(), + "Should not influence the last lsn by uploading an older checkpoint" + ); + assert_eq!( + updated_timeline + .archive_data(third_uploaded_archive) + .unwrap() + .disk_consistent_lsn(), + third_upload_metadata.disk_consistent_lsn(), + "Uploaded archive should have corresponding Lsn" + ); + assert_eq!( + updated_timeline.stored_files(&local_timeline_path), + vec![ + local_timeline_path.join("a"), + local_timeline_path.join("b"), + local_timeline_path.join("c"), + local_timeline_path.join("d"), + ] + .into_iter() + .collect(), + "Should have all files from three checkpoints without duplicates" ); - assert_timelines_equal(expected_uploads, remote_timelines); Ok(()) } #[tokio::test] - async fn reupload_missing_metadata() -> anyhow::Result<()> { - let repo_harness = RepoHarness::create("reupload_missing_metadata")?; - let sync_id = TimelineSyncId(repo_harness.tenant_id, NO_METADATA_TIMELINE_ID); - let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?; - let mut remote_timelines = - store_timelines_with_incorrect_metadata(&repo_harness, &storage).await?; - assert_timelines_equal( - remote_timelines.clone(), - fetch_existing_uploads(&storage).await?, - ); - - let old_remote_timeline = remote_timelines.get(&sync_id).unwrap().clone(); - let updated_metadata = dummy_metadata(Lsn(0x100)); - create_local_metadata(&repo_harness, NO_METADATA_TIMELINE_ID, &updated_metadata)?; - ensure_correct_timeline_upload( - &repo_harness, - &mut remote_timelines, - &storage, - NO_METADATA_TIMELINE_ID, - NewCheckpoint { - layers: old_remote_timeline.layers.clone(), - metadata: updated_metadata.clone(), - }, - ) - .await; - let reuploaded_timelines = fetch_existing_uploads(&storage).await?; - - let mut expected_timeline = RemoteTimeline { - metadata: Some(updated_metadata), - ..old_remote_timeline - }; - expected_timeline.layers.sort(); - let mut updated_timeline = reuploaded_timelines.get(&sync_id).unwrap().clone(); - updated_timeline.layers.sort(); - assert_eq!(expected_timeline, updated_timeline); - - Ok(()) - } - - #[tokio::test] - async fn test_upload_with_errors() -> anyhow::Result<()> { - let repo_harness = RepoHarness::create("test_upload_with_errors")?; + async fn reupload_timeline_rejected() -> anyhow::Result<()> { + let repo_harness = RepoHarness::create("reupload_timeline")?; let sync_id = TimelineSyncId(repo_harness.tenant_id, TIMELINE_ID); - let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?; + let storage = Arc::new(LocalFs::new( + tempdir()?.path().to_owned(), + &repo_harness.conf.workdir, + )?); let mut remote_timelines = HashMap::new(); - let local_path = repo_harness.timeline_path(&TIMELINE_ID).join("something"); - assert!(!local_path.exists()); - assert!(fetch_existing_uploads(&storage).await?.is_empty()); + let first_upload_metadata = dummy_metadata(Lsn(0x10)); + let first_checkpoint = create_local_timeline( + &repo_harness, + TIMELINE_ID, + &["a", "b"], + first_upload_metadata.clone(), + )?; + ensure_correct_timeline_upload( + &repo_harness, + &mut remote_timelines, + Arc::clone(&storage), + TIMELINE_ID, + first_checkpoint, + ) + .await; + let after_first_uploads = remote_timelines.clone(); - let timeline_without_local_files = NewCheckpoint { - layers: vec![local_path], - metadata: dummy_metadata(Lsn(0x30)), - }; + let normal_upload_metadata = dummy_metadata(Lsn(0x20)); + assert_ne!( + normal_upload_metadata.disk_consistent_lsn(), + first_upload_metadata.disk_consistent_lsn() + ); - upload_timeline( + let checkpoint_with_no_files = create_local_timeline( + &repo_harness, + TIMELINE_ID, + &[], + normal_upload_metadata.clone(), + )?; + upload_timeline_checkpoint( repo_harness.conf, &mut remote_timelines, - &storage, + Arc::clone(&storage), sync_id, - timeline_without_local_files.clone(), + checkpoint_with_no_files, 0, ) .await; + assert_timelines_equal(after_first_uploads.clone(), remote_timelines.clone()); - assert!(fetch_existing_uploads(&storage).await?.is_empty()); - assert!(!repo_harness.timeline_path(&TIMELINE_ID).exists()); + let checkpoint_with_uploaded_lsn = create_local_timeline( + &repo_harness, + TIMELINE_ID, + &["something", "new"], + first_upload_metadata.clone(), + )?; + upload_timeline_checkpoint( + repo_harness.conf, + &mut remote_timelines, + Arc::clone(&storage), + sync_id, + checkpoint_with_uploaded_lsn, + 0, + ) + .await; + assert_timelines_equal(after_first_uploads, remote_timelines); Ok(()) } @@ -1317,11 +995,11 @@ mod tests { async fn test_download_timeline() -> anyhow::Result<()> { let repo_harness = RepoHarness::create("test_download_timeline")?; let sync_id = TimelineSyncId(repo_harness.tenant_id, TIMELINE_ID); - let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?; - let mut remote_timelines = - store_timelines_with_incorrect_metadata(&repo_harness, &storage).await?; - fs::remove_dir_all(repo_harness.timeline_path(&NO_METADATA_TIMELINE_ID))?; - fs::remove_dir_all(repo_harness.timeline_path(&CORRUPT_METADATA_TIMELINE_ID))?; + let storage = Arc::new(LocalFs::new( + tempdir()?.path().to_owned(), + &repo_harness.conf.workdir, + )?); + let mut remote_timelines = HashMap::new(); let regular_timeline_path = repo_harness.timeline_path(&TIMELINE_ID); let regular_timeline = create_local_timeline( @@ -1333,7 +1011,7 @@ mod tests { ensure_correct_timeline_upload( &repo_harness, &mut remote_timelines, - &storage, + Arc::clone(&storage), TIMELINE_ID, regular_timeline, ) @@ -1343,105 +1021,21 @@ mod tests { download_timeline( repo_harness.conf, - &storage, + &remote_timelines, + Arc::clone(&storage), sync_id, - remote_regular_timeline.clone(), - 0, - ) - .await; - download_timeline( - repo_harness.conf, - &storage, - sync_id, - remote_regular_timeline.clone(), - 0, - ) - .await; - download_timeline( - repo_harness.conf, - &storage, - sync_id, - remote_timelines.get(&sync_id).unwrap().clone(), - 0, - ) - .await; - download_timeline( - repo_harness.conf, - &storage, - sync_id, - remote_timelines.get(&sync_id).unwrap().clone(), - 0, - ) - .await; - - assert_timelines_equal(remote_timelines, fetch_existing_uploads(&storage).await?); - assert!(!repo_harness - .timeline_path(&NO_METADATA_TIMELINE_ID) - .exists()); - assert!(!repo_harness - .timeline_path(&CORRUPT_METADATA_TIMELINE_ID) - .exists()); - assert_timeline_files_match(&repo_harness, TIMELINE_ID, remote_regular_timeline); - - Ok(()) - } - - #[tokio::test] - async fn metadata_file_sync() -> anyhow::Result<()> { - let repo_harness = RepoHarness::create("metadata_file_sync")?; - let sync_id = TimelineSyncId(repo_harness.tenant_id, TIMELINE_ID); - let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?; - let mut remote_timelines = HashMap::new(); - - let uploaded_metadata = dummy_metadata(Lsn(0x30)); - let metadata_local_path = - metadata_path(repo_harness.conf, TIMELINE_ID, repo_harness.tenant_id); - let new_upload = create_local_timeline( - &repo_harness, - TIMELINE_ID, - &["a", "b"], - uploaded_metadata.clone(), - )?; - tokio::fs::write(&metadata_local_path, b"incorrect metadata").await?; - - upload_timeline( - repo_harness.conf, - &mut remote_timelines, - &storage, - sync_id, - new_upload.clone(), + TimelineDownload { + files_to_skip: Arc::new(BTreeSet::new()), + archives_to_download: remote_regular_timeline.stored_archives(), + }, 0, ) .await; assert_timelines_equal( - remote_timelines.clone(), - fetch_existing_uploads(&storage).await?, - ); - - let remote_timeline = remote_timelines.get(&sync_id).unwrap().clone(); - assert_eq!( - remote_timeline.metadata.as_ref(), - Some(&uploaded_metadata), - "Local corrputed metadata should be ignored when uploading an image" - ); - - download_timeline( - repo_harness.conf, - &storage, - sync_id, - remote_timeline.clone(), - 0, - ) - .await; - let downloaded_metadata_bytes = tokio::fs::read(&metadata_local_path) - .await - .expect("Failed to read metadata file contents after redownload"); - let downloaded_metadata = TimelineMetadata::from_bytes(&downloaded_metadata_bytes) - .expect("Failed to parse metadata file contents after redownload"); - assert_eq!( - downloaded_metadata, uploaded_metadata, - "Should redownload the same metadata that was uploaed" + remote_timelines, + reconstruct_from_storage(storage.as_ref()).await?, ); + assert_timeline_files_match(&repo_harness, TIMELINE_ID, remote_regular_timeline); Ok(()) } @@ -1450,15 +1044,15 @@ mod tests { async fn ensure_correct_timeline_upload( harness: &RepoHarness, remote_timelines: &mut HashMap, - remote_storage: &LocalFs, + remote_storage: Arc, timeline_id: ZTimelineId, new_upload: NewCheckpoint, ) { let sync_id = TimelineSyncId(harness.tenant_id, timeline_id); - upload_timeline( + upload_timeline_checkpoint( harness.conf, remote_timelines, - remote_storage, + Arc::clone(&remote_storage), sync_id, new_upload.clone(), 0, @@ -1466,20 +1060,29 @@ mod tests { .await; assert_timelines_equal( remote_timelines.clone(), - fetch_existing_uploads(remote_storage).await.unwrap(), + reconstruct_from_storage(remote_storage.as_ref()) + .await + .unwrap(), ); - let new_remote_files = remote_timelines.get(&sync_id).unwrap().clone(); - assert_eq!( - new_remote_files.metadata, - Some(new_upload.metadata.clone()), - "Remote timeline should have an updated metadata with later Lsn after successful reupload" + let new_remote_timeline = remote_timelines.get(&sync_id).unwrap().clone(); + let new_remote_lsn = new_remote_timeline + .latest_disk_consistent_lsn() + .expect("Remote timeline should have an lsn after reupload"); + let upload_lsn = new_upload.metadata.disk_consistent_lsn(); + assert!( + new_remote_lsn >= upload_lsn, + "Remote timeline after upload should have the biggest Lsn out of all uploads" ); - let remote_files_after_upload = new_remote_files - .layers - .clone() - .into_iter() - .collect::>(); + assert!( + new_remote_timeline + .stored_archives() + .contains(&ArchiveId(upload_lsn)), + "Should contain upload lsn among the remote ones" + ); + + let remote_files_after_upload = new_remote_timeline + .stored_files(&harness.conf.timeline_path(&timeline_id, &harness.tenant_id)); for new_uploaded_layer in &new_upload.layers { assert!( remote_files_after_upload.contains(new_uploaded_layer), @@ -1488,30 +1091,16 @@ mod tests { ); } - assert_timeline_files_match(harness, timeline_id, new_remote_files); + assert_timeline_files_match(harness, timeline_id, new_remote_timeline); } #[track_caller] fn assert_timelines_equal( - mut expected: HashMap, - mut actual: HashMap, + expected: HashMap, + actual: HashMap, ) { - let expected_sorted = expected - .iter_mut() - .map(|(key, remote_timeline)| { - remote_timeline.layers.sort(); - (key, remote_timeline) - }) - .collect::>(); - - let actual_sorted = actual - .iter_mut() - .map(|(key, remote_timeline)| { - remote_timeline.layers.sort(); - (key, remote_timeline) - }) - .collect::>(); - + let expected_sorted = expected.iter().collect::>(); + let actual_sorted = actual.iter().collect::>(); assert_eq!( expected_sorted, actual_sorted, "Different timeline contents" @@ -1520,26 +1109,31 @@ mod tests { fn assert_timeline_files_match( harness: &RepoHarness, - timeline_id: ZTimelineId, - remote_files: RemoteTimeline, + remote_timeline_id: ZTimelineId, + remote_timeline: RemoteTimeline, ) { - let local_timeline_dir = harness.timeline_path(&timeline_id); + let local_timeline_dir = harness.timeline_path(&remote_timeline_id); let local_paths = fs::read_dir(&local_timeline_dir) .unwrap() .map(|dir| dir.unwrap().path()) .collect::>(); - let mut reported_remote_files = remote_files.layers.into_iter().collect::>(); - if let Some(remote_metadata) = remote_files.metadata { - let local_metadata_path = metadata_path(harness.conf, timeline_id, harness.tenant_id); - let local_metadata = TimelineMetadata::from_bytes( - &fs::read(&local_metadata_path).expect("Failed to read metadata file when comparing remote and local image files") - ).expect("Failed to parse metadata file contents when comparing remote and local image files"); - assert_eq!( - local_metadata, remote_metadata, - "Timeline remote metadata is different the local one" - ); - reported_remote_files.insert(local_metadata_path); - } + let mut reported_remote_files = remote_timeline.stored_files(&local_timeline_dir); + let local_metadata_path = + metadata_path(harness.conf, remote_timeline_id, harness.tenant_id); + let local_metadata = TimelineMetadata::from_bytes( + &fs::read(&local_metadata_path) + .expect("Failed to read metadata file when comparing remote and local image files"), + ) + .expect( + "Failed to parse metadata file contents when comparing remote and local image files", + ); + assert!( + remote_timeline + .stored_archives() + .contains(&ArchiveId(local_metadata.disk_consistent_lsn())), + "Should contain local lsn among the remote ones after the upload" + ); + reported_remote_files.insert(local_metadata_path); assert_eq!( local_paths, reported_remote_files, @@ -1569,64 +1163,6 @@ mod tests { } } - async fn store_timelines_with_incorrect_metadata( - harness: &RepoHarness, - storage: &LocalFs, - ) -> anyhow::Result> { - let mut remote_timelines = HashMap::new(); - - ensure_correct_timeline_upload( - harness, - &mut remote_timelines, - storage, - NO_METADATA_TIMELINE_ID, - create_local_timeline( - harness, - NO_METADATA_TIMELINE_ID, - &["a1", "b1"], - dummy_metadata(Lsn(0)), - )?, - ) - .await; - ensure_correct_timeline_upload( - harness, - &mut remote_timelines, - storage, - CORRUPT_METADATA_TIMELINE_ID, - create_local_timeline( - harness, - CORRUPT_METADATA_TIMELINE_ID, - &["a2", "b2"], - dummy_metadata(Lsn(0)), - )?, - ) - .await; - - storage - .delete(&storage.storage_path(&metadata_path( - harness.conf, - NO_METADATA_TIMELINE_ID, - harness.tenant_id, - ))?) - .await?; - storage - .upload( - BufReader::new(Cursor::new("corrupt meta".to_string().into_bytes())), - &storage.storage_path(&metadata_path( - harness.conf, - CORRUPT_METADATA_TIMELINE_ID, - harness.tenant_id, - ))?, - ) - .await?; - - for remote_file in remote_timelines.values_mut() { - remote_file.metadata = None; - } - - Ok(remote_timelines) - } - fn create_local_timeline( harness: &RepoHarness, timeline_id: ZTimelineId, @@ -1643,21 +1179,12 @@ mod tests { layers.push(file_path); } - create_local_metadata(harness, timeline_id, &metadata)?; - - Ok(NewCheckpoint { layers, metadata }) - } - - fn create_local_metadata( - harness: &RepoHarness, - timeline_id: ZTimelineId, - metadata: &TimelineMetadata, - ) -> anyhow::Result<()> { fs::write( metadata_path(harness.conf, timeline_id, harness.tenant_id), metadata.to_bytes()?, )?; - Ok(()) + + Ok(NewCheckpoint { layers, metadata }) } fn dummy_contents(name: &str) -> String { diff --git a/pageserver/src/remote_storage/storage_sync/compression.rs b/pageserver/src/remote_storage/storage_sync/compression.rs new file mode 100644 index 0000000000..4f0db1e393 --- /dev/null +++ b/pageserver/src/remote_storage/storage_sync/compression.rs @@ -0,0 +1,628 @@ +//! A set of methods to asynchronously compress and uncompress a stream of data, without holding the entire data in memory. +//! +//! For that, both comporess and uncompress functions operate buffered streams (currently hardcoded sice of [`ARCHIVE_STREAM_BUFFER_SIZE_BYTES`]), +//! not attempting to hold the entire archive in memory. +//! +//! With those ideas, the compression is done with zstd streaming compression algorithm via the `async-compression` crate. +//! The create does not contain any knobs to tweak the compression, but otherwise is one of the only ones that's both async and has an API to manage the part of an acrhive. +//! Zstd was picked as the best algorithm among the ones available in the crate, after testing the initial timeline file compression. +//! +//! Archiving is almost agnostic to timeline file types, with an exception of the metadata file, that's currently distinguished in the [un]compression code. +//! +//! Archive structure: +//! +----------------------------------------+ +//! | header | file_1, ..., file_k, metadata | +//! +----------------------------------------+ +//! +//! The archive consists of two separate files: +//! * header, that contains all files names and their sizes and relative paths in the timeline directory +//! Header is a Rust structure, serialized into bytes and compressed with zstd. +//! * files part, that has metadata file as the last one, all compressed with zstd into a single binary blob +//! +//! Header offset is stored in the file name, along with the `disk_consistent_lsn` from the metadata file. +//! See [`parse_archive_name`] and [`ARCHIVE_EXTENSION`] for the name details, example: `00000000016B9150-.zst_9732`. +//! This way, the header could be retrieved without reading an entire archive file. +//! +//! The files are stored with the metadata as the last file, to reduce the risk of corrupting the metadata file. + +use std::{ + collections::BTreeSet, + future::Future, + io::Cursor, + path::{Path, PathBuf}, + sync::Arc, +}; + +use anyhow::{anyhow, bail, ensure, Context}; +use async_compression::tokio::bufread::{ZstdDecoder, ZstdEncoder}; +use serde::{Deserialize, Serialize}; +use tokio::{ + fs, + io::{self, AsyncReadExt, AsyncWriteExt}, +}; +use tracing::*; +use zenith_utils::{bin_ser::BeSer, lsn::Lsn}; + +use crate::layered_repository::metadata::{TimelineMetadata, METADATA_FILE_NAME}; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ArchiveHeader { + pub files: Vec, + // Metadata file name is known to the system, as its location relative to the timeline dir, + // so no need to store anything but its size in bytes. + pub metadata_file_size: u64, +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub struct FileEntry { + /// Uncompressed file size, bytes. + pub size: u64, + /// A path, relative to the directory root, used when compressing the directory contents. + pub subpath: String, +} + +const ARCHIVE_EXTENSION: &str = "-.zst_"; +const ARCHIVE_STREAM_BUFFER_SIZE_BYTES: usize = 4 * 1024 * 1024; + +/// Streams an archive of files given into a stream target, defined by the closure. +/// +/// The closure approach is picked for cases like S3, where we would need a name of the file before we can get a stream to write the bytes into. +/// Current idea is to place the header size in the name of the file, to enable the fast partial remote file index restoration without actually reading remote storage file contents. +/// +/// Performs the compression in multiple steps: +/// * prepares an archive header, stripping the `source_dir` prefix from the `files` +/// * generates the name of the archive +/// * prepares archive producer future, knowing the header and the file list +/// An `impl AsyncRead` and `impl AsyncWrite` pair of connected streams is created to implement the partial contents streaming. +/// The writer end gets into the archive producer future, to put the header and a stream of compressed files. +/// * prepares archive consumer future, by executing the provided closure +/// The closure gets the reader end stream and the name of the file to cretate a future that would stream the file contents elsewhere. +/// * runs and waits for both futures to complete +/// * on a successful completion of both futures, hedader, its size and the user-defined consumer future return data is returned +/// Due to the design above, the archive name and related data is visible inside the consumer future only, so it's possible to return the data, +/// needed for future processing. +pub async fn archive_files_as_stream( + source_dir: &Path, + files: impl Iterator, + metadata: &TimelineMetadata, + create_archive_consumer: Cons, +) -> anyhow::Result<(ArchiveHeader, u64, ConsRet)> +where + Cons: FnOnce(Box, String) -> Fut + + Send + + 'static, + Fut: Future> + Send + 'static, + ConsRet: Send + Sync + 'static, +{ + let metadata_bytes = metadata + .to_bytes() + .context("Failed to create metadata bytes")?; + let (archive_header, compressed_header_bytes) = + prepare_header(source_dir, files, &metadata_bytes) + .await + .context("Failed to prepare file for archivation")?; + + let header_size = compressed_header_bytes.len() as u64; + let (write, read) = io::duplex(ARCHIVE_STREAM_BUFFER_SIZE_BYTES); + let archive_filler = write_archive_contents( + source_dir.to_path_buf(), + archive_header.clone(), + metadata_bytes, + write, + ); + let archive_name = archive_name(metadata.disk_consistent_lsn(), header_size); + let archive_stream = + Cursor::new(compressed_header_bytes).chain(ZstdEncoder::new(io::BufReader::new(read))); + + let (archive_creation_result, archive_upload_result) = tokio::join!( + tokio::spawn(archive_filler), + tokio::spawn(async move { + create_archive_consumer(Box::new(archive_stream), archive_name).await + }) + ); + archive_creation_result + .context("Failed to spawn archive creation future")? + .context("Failed to create an archive")?; + let upload_return_value = archive_upload_result + .context("Failed to spawn archive upload future")? + .context("Failed to upload the archive")?; + + Ok((archive_header, header_size, upload_return_value)) +} + +/// Similar to [`archive_files_as_stream`], creates a pair of streams to uncompress the 2nd part of the archive, +/// that contains files and is located after the header. +/// S3 allows downloading partial file contents for a given file key (i.e. name), to accomodate this retrieval, +/// a closure is used. +/// Same concepts with two concurrent futures, user-defined closure, future and return value apply here, but the +/// consumer and the receiver ends are swapped, since the uncompression happens. +pub async fn uncompress_file_stream_with_index( + destination_dir: PathBuf, + files_to_skip: Arc>, + disk_consistent_lsn: Lsn, + header: ArchiveHeader, + header_size: u64, + create_archive_file_part: Prod, +) -> anyhow::Result +where + Prod: FnOnce(Box, String) -> Fut + + Send + + 'static, + Fut: Future> + Send + 'static, + ProdRet: Send + Sync + 'static, +{ + let (write, mut read) = io::duplex(ARCHIVE_STREAM_BUFFER_SIZE_BYTES); + let archive_name = archive_name(disk_consistent_lsn, header_size); + + let (archive_download_result, archive_uncompress_result) = tokio::join!( + tokio::spawn(async move { create_archive_file_part(Box::new(write), archive_name).await }), + tokio::spawn(async move { + uncompress_with_header(&files_to_skip, &destination_dir, header, &mut read).await + }) + ); + + let download_value = archive_download_result + .context("Failed to spawn archive download future")? + .context("Failed to download an archive")?; + archive_uncompress_result + .context("Failed to spawn archive uncompress future")? + .context("Failed to uncompress the archive")?; + + Ok(download_value) +} + +/// Reads archive header from the stream given: +/// * parses the file name to get the header size +/// * reads the exact amount of bytes +/// * uncompresses and deserializes those +pub async fn read_archive_header( + archive_name: &str, + from: &mut A, +) -> anyhow::Result { + let (_, header_size) = parse_archive_name(Path::new(archive_name))?; + + let mut compressed_header_bytes = vec![0; header_size as usize]; + from.read_exact(&mut compressed_header_bytes) + .await + .with_context(|| { + format!( + "Failed to read header header from the archive {}", + archive_name + ) + })?; + + let mut header_bytes = Vec::new(); + ZstdDecoder::new(io::BufReader::new(compressed_header_bytes.as_slice())) + .read_to_end(&mut header_bytes) + .await + .context("Failed to decompress a header from the archive")?; + header_bytes + .flush() + .await + .context("Failed to decompress a header from the archive")?; + + Ok(ArchiveHeader::des(&header_bytes) + .context("Failed to deserialize a header from the archive")?) +} + +/// Reads the archive metadata out of the archive name: +/// * `disk_consistent_lsn` of the checkpoint that was archived +/// * size of the archive header +pub fn parse_archive_name(archive_path: &Path) -> anyhow::Result<(Lsn, u64)> { + let archive_name = archive_path + .file_name() + .ok_or_else(|| anyhow!("Archive '{}' has no file name", archive_path.display()))? + .to_string_lossy(); + let (lsn_str, header_size_str) = + archive_name.rsplit_once(ARCHIVE_EXTENSION).ok_or_else(|| { + anyhow!( + "Archive '{}' has incorrect extension, expected to contain '{}'", + archive_path.display(), + ARCHIVE_EXTENSION + ) + })?; + let disk_consistent_lsn = Lsn::from_hex(lsn_str).with_context(|| { + format!( + "Archive '{}' has an invalid disk consistent lsn in its extension", + archive_path.display(), + ) + })?; + let header_size = header_size_str.parse::().with_context(|| { + format!( + "Archive '{}' has an invalid a header offset number in its extension", + archive_path.display(), + ) + })?; + Ok((disk_consistent_lsn, header_size)) +} + +fn archive_name(disk_consistent_lsn: Lsn, header_size: u64) -> String { + let archive_name = format!( + "{:016X}{ARCHIVE_EXTENSION}{}", + u64::from(disk_consistent_lsn), + header_size, + ARCHIVE_EXTENSION = ARCHIVE_EXTENSION, + ); + archive_name +} + +async fn uncompress_with_header( + files_to_skip: &BTreeSet, + destination_dir: &Path, + header: ArchiveHeader, + archive_after_header: impl io::AsyncRead + Send + Sync + Unpin, +) -> anyhow::Result<()> { + debug!("Uncompressing archive into {}", destination_dir.display()); + let mut archive = ZstdDecoder::new(io::BufReader::new(archive_after_header)); + + if !destination_dir.exists() { + fs::create_dir_all(&destination_dir) + .await + .with_context(|| { + format!( + "Failed to create target directory at {}", + destination_dir.display() + ) + })?; + } else if !destination_dir.is_dir() { + bail!( + "Destination path '{}' is not a valid directory", + destination_dir.display() + ); + } + debug!("Will extract {} files from the archive", header.files.len()); + for entry in header.files { + uncompress_entry( + &mut archive, + &entry.subpath, + entry.size, + files_to_skip, + destination_dir, + ) + .await + .with_context(|| format!("Failed to uncompress archive entry {:?}", entry))?; + } + uncompress_entry( + &mut archive, + METADATA_FILE_NAME, + header.metadata_file_size, + files_to_skip, + destination_dir, + ) + .await + .context("Failed to uncompress the metadata entry")?; + Ok(()) +} + +async fn uncompress_entry( + archive: &mut ZstdDecoder>, + entry_subpath: &str, + entry_size: u64, + files_to_skip: &BTreeSet, + destination_dir: &Path, +) -> anyhow::Result<()> { + let destination_path = destination_dir.join(entry_subpath); + if let Some(parent) = destination_path.parent() { + fs::create_dir_all(parent).await.with_context(|| { + format!( + "Failed to create parent directory for {}", + destination_path.display() + ) + })?; + }; + + if files_to_skip.contains(destination_path.as_path()) { + debug!("Skipping {}", destination_path.display()); + read_n_bytes(entry_size, archive, &mut io::sink()) + .await + .context("Failed to skip bytes in the archive")?; + return Ok(()); + } + + let mut destination = + io::BufWriter::new(fs::File::create(&destination_path).await.with_context(|| { + format!( + "Failed to open file {} for extraction", + destination_path.display() + ) + })?); + read_n_bytes(entry_size, archive, &mut destination) + .await + .with_context(|| { + format!( + "Failed to write extracted archive contents into file {}", + destination_path.display() + ) + })?; + destination + .flush() + .await + .context("Failed to flush the streaming archive bytes")?; + Ok(()) +} + +async fn write_archive_contents( + source_dir: PathBuf, + header: ArchiveHeader, + metadata_bytes: Vec, + mut archive_input: io::DuplexStream, +) -> anyhow::Result<()> { + debug!("Starting writing files into archive"); + for file_entry in header.files { + let path = source_dir.join(&file_entry.subpath); + let mut source_file = + io::BufReader::new(fs::File::open(&path).await.with_context(|| { + format!( + "Failed to open file for archiving to path {}", + path.display() + ) + })?); + let bytes_written = io::copy(&mut source_file, &mut archive_input) + .await + .with_context(|| { + format!( + "Failed to open add a file into archive, file path {}", + path.display() + ) + })?; + ensure!( + file_entry.size == bytes_written, + "File {} was written to the archive incompletely", + path.display() + ); + trace!( + "Added file '{}' ({} bytes) into the archive", + path.display(), + bytes_written + ); + } + let metadata_bytes_written = io::copy(&mut metadata_bytes.as_slice(), &mut archive_input) + .await + .with_context(|| "Failed to add metadata into the archive")?; + ensure!( + header.metadata_file_size == metadata_bytes_written, + "Metadata file was written to the archive incompletely", + ); + + archive_input + .shutdown() + .await + .context("Failed to finalize the archive")?; + debug!("Successfully streamed all files into the archive"); + Ok(()) +} + +async fn prepare_header( + source_dir: &Path, + files: impl Iterator, + metadata_bytes: &[u8], +) -> anyhow::Result<(ArchiveHeader, Vec)> { + let mut archive_files = Vec::new(); + for file_path in files { + let file_metadata = fs::metadata(file_path).await.with_context(|| { + format!( + "Failed to read metadata during archive indexing for {}", + file_path.display() + ) + })?; + ensure!( + file_metadata.is_file(), + "Archive indexed path {} is not a file", + file_path.display() + ); + + if file_path.file_name().and_then(|name| name.to_str()) != Some(METADATA_FILE_NAME) { + let entry = FileEntry { + subpath: file_path + .strip_prefix(&source_dir) + .with_context(|| { + format!( + "File '{}' does not belong to pageserver workspace", + file_path.display() + ) + })? + .to_string_lossy() + .to_string(), + size: file_metadata.len(), + }; + archive_files.push(entry); + } + } + + let header = ArchiveHeader { + files: archive_files, + metadata_file_size: metadata_bytes.len() as u64, + }; + + debug!("Appending a header for {} files", header.files.len()); + let header_bytes = header.ser().context("Failed to serialize a header")?; + debug!("Header bytes len {}", header_bytes.len()); + let mut compressed_header_bytes = Vec::new(); + ZstdEncoder::new(io::BufReader::new(header_bytes.as_slice())) + .read_to_end(&mut compressed_header_bytes) + .await + .context("Failed to compress header bytes")?; + debug!( + "Compressed header bytes len {}", + compressed_header_bytes.len() + ); + Ok((header, compressed_header_bytes)) +} + +async fn read_n_bytes( + n: u64, + from: &mut (impl io::AsyncRead + Send + Sync + Unpin), + into: &mut (impl io::AsyncWrite + Send + Sync + Unpin), +) -> anyhow::Result<()> { + let mut bytes_unread = n; + while bytes_unread > 0 { + let mut buf = vec![0; bytes_unread as usize]; + let bytes_read = from.read(&mut buf).await?; + if bytes_read == 0 { + break; + } + into.write_all(&buf[0..bytes_read]).await?; + bytes_unread -= bytes_read as u64; + } + ensure!( + bytes_unread == 0, + "Failed to read exactly {} bytes from the input, bytes unread: {}", + n, + bytes_unread, + ); + Ok(()) +} + +#[cfg(test)] +mod tests { + use tokio::{fs, io::AsyncSeekExt}; + + use crate::repository::repo_harness::{RepoHarness, TIMELINE_ID}; + + use super::*; + + #[tokio::test] + async fn compress_and_uncompress() -> anyhow::Result<()> { + let repo_harness = RepoHarness::create("compress_and_uncompress")?; + let timeline_dir = repo_harness.timeline_path(&TIMELINE_ID); + init_directory( + &timeline_dir, + vec![ + ("first", "first_contents"), + ("second", "second_contents"), + (METADATA_FILE_NAME, "wrong_metadata"), + ], + ) + .await?; + let timeline_files = list_file_paths_with_contents(&timeline_dir).await?; + assert_eq!( + timeline_files, + vec![ + ( + timeline_dir.join("first"), + FileContents::Text("first_contents".to_string()) + ), + ( + timeline_dir.join(METADATA_FILE_NAME), + FileContents::Text("wrong_metadata".to_string()) + ), + ( + timeline_dir.join("second"), + FileContents::Text("second_contents".to_string()) + ), + ], + "Initial timeline contents should contain two normal files and a wrong metadata file" + ); + + let metadata = TimelineMetadata::new(Lsn(0x30), None, None, Lsn(0), Lsn(0), Lsn(0)); + let paths_to_archive = timeline_files + .into_iter() + .map(|(path, _)| path) + .collect::>(); + + let tempdir = tempfile::tempdir()?; + let base_path = tempdir.path().to_path_buf(); + let (header, header_size, archive_target) = archive_files_as_stream( + &timeline_dir, + paths_to_archive.iter(), + &metadata, + move |mut archive_streamer, archive_name| async move { + let archive_target = base_path.join(&archive_name); + let mut archive_file = fs::File::create(&archive_target).await?; + io::copy(&mut archive_streamer, &mut archive_file).await?; + Ok(archive_target) + }, + ) + .await?; + + let mut file = fs::File::open(&archive_target).await?; + file.seek(io::SeekFrom::Start(header_size)).await?; + let target_dir = tempdir.path().join("extracted"); + uncompress_with_header(&BTreeSet::new(), &target_dir, header, file).await?; + + let extracted_files = list_file_paths_with_contents(&target_dir).await?; + + assert_eq!( + extracted_files, + vec![ + ( + target_dir.join("first"), + FileContents::Text("first_contents".to_string()) + ), + ( + target_dir.join(METADATA_FILE_NAME), + FileContents::Binary(metadata.to_bytes()?) + ), + ( + target_dir.join("second"), + FileContents::Text("second_contents".to_string()) + ), + ], + "Extracted files should contain all local timeline files besides its metadata, which should be taken from the arguments" + ); + + Ok(()) + } + + async fn init_directory( + root: &Path, + files_with_contents: Vec<(&str, &str)>, + ) -> anyhow::Result<()> { + fs::create_dir_all(root).await?; + for (file_name, contents) in files_with_contents { + fs::File::create(root.join(file_name)) + .await? + .write_all(contents.as_bytes()) + .await?; + } + Ok(()) + } + + #[derive(PartialEq, Eq, PartialOrd, Ord)] + enum FileContents { + Text(String), + Binary(Vec), + } + + impl std::fmt::Debug for FileContents { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Text(text) => f.debug_tuple("Text").field(text).finish(), + Self::Binary(bytes) => f + .debug_tuple("Binary") + .field(&format!("{} bytes", bytes.len())) + .finish(), + } + } + } + + async fn list_file_paths_with_contents( + root: &Path, + ) -> anyhow::Result> { + let mut file_paths = Vec::new(); + + let mut dir_listings = vec![fs::read_dir(root).await?]; + while let Some(mut dir_listing) = dir_listings.pop() { + while let Some(entry) = dir_listing.next_entry().await? { + let entry_path = entry.path(); + if entry_path.is_file() { + let contents = match String::from_utf8(fs::read(&entry_path).await?) { + Ok(text) => FileContents::Text(text), + Err(e) => FileContents::Binary(e.into_bytes()), + }; + file_paths.push((entry_path, contents)); + } else if entry_path.is_dir() { + dir_listings.push(fs::read_dir(entry_path).await?); + } else { + info!( + "Skipping path '{}' as it's not a file or a directory", + entry_path.display() + ); + } + } + } + + file_paths.sort(); + Ok(file_paths) + } +} diff --git a/pageserver/src/remote_storage/storage_sync/index.rs b/pageserver/src/remote_storage/storage_sync/index.rs new file mode 100644 index 0000000000..5a97e44955 --- /dev/null +++ b/pageserver/src/remote_storage/storage_sync/index.rs @@ -0,0 +1,375 @@ +//! In-memory index to track the timeline files in the remote strorage's archives. +//! Able to restore itself from the storage archive data and reconstruct archive indices on demand. +//! +//! The index is intended to be portable, so deliberately does not store any local paths inside. +//! This way in the future, the index could be restored fast from its serialized stored form. + +use std::{ + collections::{BTreeMap, BTreeSet, HashMap}, + path::{Path, PathBuf}, +}; + +use anyhow::{anyhow, bail, ensure, Context}; +use futures::stream::{FuturesUnordered, StreamExt}; +use tracing::error; +use zenith_utils::{ + lsn::Lsn, + zid::{ZTenantId, ZTimelineId}, +}; + +use crate::{ + layered_repository::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME}, + remote_storage::{ + storage_sync::compression::{parse_archive_name, FileEntry}, + RemoteStorage, TimelineSyncId, + }, +}; + +use super::compression::{read_archive_header, ArchiveHeader}; + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] +pub struct ArchiveId(pub(super) Lsn); + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] +struct FileId(ArchiveId, ArchiveEntryNumber); + +type ArchiveEntryNumber = usize; + +/// All archives and files in them, representing a certain timeline. +/// Uses file and archive IDs to reference those without ownership issues. +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct RemoteTimeline { + timeline_files: BTreeMap, + checkpoint_archives: BTreeMap, +} + +/// Archive metadata, enough to restore a header with the timeline data. +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct CheckpointArchive { + disk_consistent_lsn: Lsn, + metadata_file_size: u64, + files: BTreeSet, + archive_header_size: u64, +} + +impl CheckpointArchive { + pub fn disk_consistent_lsn(&self) -> Lsn { + self.disk_consistent_lsn + } +} + +impl RemoteTimeline { + pub fn empty() -> Self { + Self { + timeline_files: BTreeMap::new(), + checkpoint_archives: BTreeMap::new(), + } + } + + /// Lists all relish files in the given remote timeline. Omits the metadata file. + pub fn stored_files(&self, timeline_dir: &Path) -> BTreeSet { + self.timeline_files + .values() + .map(|file_entry| timeline_dir.join(&file_entry.subpath)) + .collect() + } + + pub fn stored_archives(&self) -> Vec { + self.checkpoint_archives.keys().copied().collect() + } + + #[cfg(test)] + pub fn latest_disk_consistent_lsn(&self) -> Option { + self.checkpoint_archives + .keys() + .last() + .map(|archive_id| archive_id.0) + } + + pub fn contains_archive(&self, disk_consistent_lsn: Lsn) -> bool { + self.checkpoint_archives + .contains_key(&ArchiveId(disk_consistent_lsn)) + } + + pub fn archive_data(&self, archive_id: ArchiveId) -> Option<&CheckpointArchive> { + self.checkpoint_archives.get(&archive_id) + } + + /// Restores a header of a certain remote archive from the memory data. + /// Returns the header and its compressed size in the archive, both can be used to uncompress that archive. + pub fn restore_header(&self, archive_id: ArchiveId) -> anyhow::Result<(ArchiveHeader, u64)> { + let archive = self + .checkpoint_archives + .get(&archive_id) + .ok_or_else(|| anyhow!("Archive {:?} not found", archive_id))?; + + let mut header_files = Vec::with_capacity(archive.files.len()); + for (expected_archive_position, archive_file) in archive.files.iter().enumerate() { + let &FileId(archive_id, archive_position) = archive_file; + ensure!( + expected_archive_position == archive_position, + "Archive header is corrupt, file # {} from archive {:?} header is missing", + expected_archive_position, + archive_id, + ); + + let timeline_file = self.timeline_files.get(archive_file).ok_or_else(|| { + anyhow!( + "File with id {:?} not found for archive {:?}", + archive_file, + archive_id + ) + })?; + header_files.push(timeline_file.clone()); + } + + Ok(( + ArchiveHeader { + files: header_files, + metadata_file_size: archive.metadata_file_size, + }, + archive.archive_header_size, + )) + } + + /// Updates (creates, if necessary) the data about a certain archive contents. + pub fn set_archive_contents( + &mut self, + disk_consistent_lsn: Lsn, + header: ArchiveHeader, + header_size: u64, + ) { + let archive_id = ArchiveId(disk_consistent_lsn); + let mut common_archive_files = BTreeSet::new(); + for (file_index, file_entry) in header.files.into_iter().enumerate() { + let file_id = FileId(archive_id, file_index); + self.timeline_files.insert(file_id, file_entry); + common_archive_files.insert(file_id); + } + + let metadata_file_size = header.metadata_file_size; + self.checkpoint_archives + .entry(archive_id) + .or_insert_with(|| CheckpointArchive { + metadata_file_size, + files: BTreeSet::new(), + archive_header_size: header_size, + disk_consistent_lsn, + }) + .files + .extend(common_archive_files.into_iter()); + } +} + +/// Reads remote storage file list, parses the data from the file paths and uses it to read every archive's header for every timeline, +/// thus restoring the file list for every timeline. +/// Due to the way headers are stored, S3 api for accessing file byte range is used, so we don't have to download an entire archive for its listing. +pub(super) async fn reconstruct_from_storage< + P: std::fmt::Debug + Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, +>( + storage: &S, +) -> anyhow::Result> { + let mut index = HashMap::::new(); + for (sync_id, remote_archives) in collect_archives(storage).await? { + let mut archive_header_downloads = remote_archives + .into_iter() + .map(|(archive_id, (archive, remote_path))| async move { + let mut header_buf = std::io::Cursor::new(Vec::new()); + storage + .download_range(&remote_path, 0, Some(archive.header_size), &mut header_buf) + .await + .map_err(|e| (e, archive_id))?; + let header_buf = header_buf.into_inner(); + let header = read_archive_header(&archive.archive_name, &mut header_buf.as_slice()) + .await + .map_err(|e| (e, archive_id))?; + Ok::<_, (anyhow::Error, ArchiveId)>((archive_id, archive.header_size, header)) + }) + .collect::>(); + + while let Some(header_data) = archive_header_downloads.next().await { + match header_data { + Ok((archive_id, header_size, header)) => { + index + .entry(sync_id) + .or_insert_with(RemoteTimeline::empty) + .set_archive_contents(archive_id.0, header, header_size); + } + Err((e, archive_id)) => { + bail!( + "Failed to download archive header for tenant {}, timeline {}, archive for Lsn {}: {}", + sync_id.0, sync_id.1, archive_id.0, + e + ); + } + } + } + } + Ok(index) +} + +async fn collect_archives< + P: std::fmt::Debug + Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, +>( + storage: &S, +) -> anyhow::Result>> { + let mut remote_archives = + HashMap::>::new(); + for (local_path, remote_path) in storage + .list() + .await + .context("Failed to list remote storage files")? + .into_iter() + .map(|remote_path| (storage.local_path(&remote_path), remote_path)) + { + match local_path.and_then(|local_path| parse_archive_description(&local_path)) { + Ok((sync_id, archive_description)) => { + remote_archives.entry(sync_id).or_default().insert( + ArchiveId(archive_description.disk_consistent_lsn), + (archive_description, remote_path), + ); + } + Err(e) => error!( + "Failed to parse archive description from path '{:?}', reason: {:#}", + remote_path, e + ), + } + } + Ok(remote_archives) +} + +struct ArchiveDescription { + header_size: u64, + disk_consistent_lsn: Lsn, + archive_name: String, +} + +fn parse_archive_description( + archive_path: &Path, +) -> anyhow::Result<(TimelineSyncId, ArchiveDescription)> { + let (disk_consistent_lsn, header_size) = + parse_archive_name(archive_path).with_context(|| { + format!( + "Failed to parse timeline id from archive name '{}'", + archive_path.display() + ) + })?; + + let mut segments = archive_path + .iter() + .skip_while(|&segment| segment != TENANTS_SEGMENT_NAME); + let tenants_segment = segments.next().ok_or_else(|| { + anyhow!( + "Found no '{}' segment in the archive path '{}'", + TENANTS_SEGMENT_NAME, + archive_path.display() + ) + })?; + ensure!( + tenants_segment == TENANTS_SEGMENT_NAME, + "Failed to extract '{}' segment from archive path '{}'", + TENANTS_SEGMENT_NAME, + archive_path.display() + ); + let tenant_id = segments + .next() + .ok_or_else(|| { + anyhow!( + "Found no tenant id in the archive path '{}'", + archive_path.display() + ) + })? + .to_string_lossy() + .parse::() + .with_context(|| { + format!( + "Failed to parse tenant id from archive path '{}'", + archive_path.display() + ) + })?; + + let timelines_segment = segments.next().ok_or_else(|| { + anyhow!( + "Found no '{}' segment in the archive path '{}'", + TIMELINES_SEGMENT_NAME, + archive_path.display() + ) + })?; + ensure!( + timelines_segment == TIMELINES_SEGMENT_NAME, + "Failed to extract '{}' segment from archive path '{}'", + TIMELINES_SEGMENT_NAME, + archive_path.display() + ); + let timeline_id = segments + .next() + .ok_or_else(|| { + anyhow!( + "Found no timeline id in the archive path '{}'", + archive_path.display() + ) + })? + .to_string_lossy() + .parse::() + .with_context(|| { + format!( + "Failed to parse timeline id from archive path '{}'", + archive_path.display() + ) + })?; + + let archive_name = archive_path + .file_name() + .ok_or_else(|| anyhow!("Archive '{}' has no file name", archive_path.display()))? + .to_string_lossy() + .to_string(); + Ok(( + TimelineSyncId(tenant_id, timeline_id), + ArchiveDescription { + header_size, + disk_consistent_lsn, + archive_name, + }, + )) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn header_restoration_preserves_file_order() { + let header = ArchiveHeader { + files: vec![ + FileEntry { + size: 5, + subpath: "one".to_string(), + }, + FileEntry { + size: 1, + subpath: "two".to_string(), + }, + FileEntry { + size: 222, + subpath: "zero".to_string(), + }, + ], + metadata_file_size: 5, + }; + + let lsn = Lsn(1); + let mut remote_timeline = RemoteTimeline::empty(); + remote_timeline.set_archive_contents(lsn, header.clone(), 15); + + let (restored_header, _) = remote_timeline + .restore_header(ArchiveId(lsn)) + .expect("Should be able to restore header from a valid remote timeline"); + + assert_eq!( + header, restored_header, + "Header restoration should preserve file order" + ); + } +} diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index ef35762831..1190e87006 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -49,15 +49,15 @@ pub enum TenantState { Stopping, } -/// A remote storage timeline synchronization event, that needs another step -/// to be fully completed. +/// A remote storage timeline synchronization event, that needs another registration step +/// inside the manager to be fully completed. #[derive(Debug)] -pub enum PostTimelineSyncStep { +pub enum TimelineRegistration { /// The timeline cannot be synchronized anymore due to some sync issues. /// Needs to be removed from pageserver, to avoid further data diverging. Evict, /// A new timeline got downloaded and needs to be loaded into pageserver. - RegisterDownload, + Download, } impl fmt::Display for TenantState { @@ -117,9 +117,10 @@ fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) { pub fn perform_post_timeline_sync_steps( conf: &'static PageServerConf, - post_sync_steps: HashMap<(ZTenantId, ZTimelineId), PostTimelineSyncStep>, + post_sync_steps: HashMap<(ZTenantId, ZTimelineId), TimelineRegistration>, ) { if post_sync_steps.is_empty() { + debug!("no post-sync steps to perform"); return; } @@ -148,7 +149,7 @@ pub fn perform_post_timeline_sync_steps( for ((tenant_id, timeline_id), post_sync_step) in post_sync_steps { match post_sync_step { - PostTimelineSyncStep::Evict => { + TimelineRegistration::Evict => { if let Err(e) = get_repository_for_tenant(tenant_id) .and_then(|repo| repo.unload_timeline(timeline_id)) { @@ -158,7 +159,14 @@ pub fn perform_post_timeline_sync_steps( ) } } - PostTimelineSyncStep::RegisterDownload => { + TimelineRegistration::Download => { + // TODO remove later, when branching is added to remote storage sync + for missing_path in [conf.branches_path(&tenant_id), conf.tags_path(&tenant_id)] { + if !missing_path.exists() { + fs::create_dir_all(&missing_path).unwrap(); + } + } + // init repo updates Tenant state init_repo(conf, tenant_id); let new_repo = get_repository_for_tenant(tenant_id).unwrap();