mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-13 16:32:56 +00:00
Compress checkpoint files before streaming into S3
This commit is contained in:
committed by
Kirill Bulatov
parent
cb4a8396fb
commit
e61732ca7c
56
Cargo.lock
generated
56
Cargo.lock
generated
@@ -41,6 +41,20 @@ version = "1.0.44"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "61604a8f862e1d5c3229fdd78f8b02c68dcf73a4c4b05fd636d12240aaa242c1"
|
||||
|
||||
[[package]]
|
||||
name = "async-compression"
|
||||
version = "0.3.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5443ccbb270374a2b1055fc72da40e1f237809cd6bb0e97e66d264cd138473a6"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"memchr",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
"zstd",
|
||||
"zstd-safe",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-trait"
|
||||
version = "0.1.51"
|
||||
@@ -249,6 +263,9 @@ name = "cc"
|
||||
version = "1.0.71"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "79c2681d6594606957bbb8631c4b90a7fcaaa72cdb714743a437b156d6a7eedd"
|
||||
dependencies = [
|
||||
"jobserver",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cexpr"
|
||||
@@ -845,6 +862,15 @@ version = "0.4.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4"
|
||||
|
||||
[[package]]
|
||||
name = "jobserver"
|
||||
version = "0.1.24"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "js-sys"
|
||||
version = "0.3.55"
|
||||
@@ -1101,6 +1127,7 @@ name = "pageserver"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-compression",
|
||||
"async-trait",
|
||||
"bookfile",
|
||||
"byteorder",
|
||||
@@ -2510,3 +2537,32 @@ dependencies = [
|
||||
"workspace_hack",
|
||||
"zenith_metrics",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zstd"
|
||||
version = "0.7.0+zstd.1.4.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9428752481d8372e15b1bf779ea518a179ad6c771cca2d2c60e4fbff3cc2cd52"
|
||||
dependencies = [
|
||||
"zstd-safe",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zstd-safe"
|
||||
version = "3.1.0+zstd.1.4.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5aa1926623ad7fe406e090555387daf73db555b948134b4d73eac5eb08fb666d"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"zstd-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zstd-sys"
|
||||
version = "1.5.0+zstd.1.4.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4e6c094340240369025fc6b731b054ee2a834328fa584310ac96aa4baebdc465"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
]
|
||||
|
||||
@@ -41,6 +41,7 @@ nix = "0.23"
|
||||
once_cell = "1.8.0"
|
||||
|
||||
rust-s3 = { version = "0.28", default-features = false, features = ["no-verify-ssl", "tokio-rustls-tls"] }
|
||||
async-compression = {version = "0.3", features = ["zstd", "tokio"]}
|
||||
|
||||
postgres_ffi = { path = "../postgres_ffi" }
|
||||
zenith_metrics = { path = "../zenith_metrics" }
|
||||
|
||||
@@ -9,7 +9,7 @@ The Page Server has a few different duties:
|
||||
|
||||
S3 is the main fault-tolerant storage of all data, as there are no Page Server
|
||||
replicas. We use a separate fault-tolerant WAL service to reduce latency. It
|
||||
keeps track of WAL records which are not syncted to S3 yet.
|
||||
keeps track of WAL records which are not synced to S3 yet.
|
||||
|
||||
The Page Server consists of multiple threads that operate on a shared
|
||||
repository of page versions:
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
//! | pageserver | (schedule checkpoint upload) | upload/download |
|
||||
//! | | | loop |
|
||||
//! | | <------------------------------- | |
|
||||
//! | | (register downloaded layers) | |
|
||||
//! | | (register downloaded timelines) | |
|
||||
//! +------------------------+ +---------<-------+
|
||||
//! |
|
||||
//! |
|
||||
@@ -36,24 +36,27 @@
|
||||
//! | access to this storage |
|
||||
//! +------------------------+
|
||||
//!
|
||||
//! First, during startup, the pageserver inits the storage sync thread with the async loop, or leaves the loop unitialised, if configured so.
|
||||
//! First, during startup, the pageserver inits the storage sync thread with the async loop, or leaves the loop uninitialised, if configured so.
|
||||
//! Some time later, during pageserver checkpoints, in-memory data is flushed onto disk along with its metadata.
|
||||
//! If the storage sync loop was successfully started before, pageserver schedules the new image uploads after every checkpoint.
|
||||
//! If the storage sync loop was successfully started before, pageserver schedules the new checkpoint file uploads after every checkpoint.
|
||||
//! See [`crate::layered_repository`] for the upload calls and the adjacent logic.
|
||||
//!
|
||||
//! The storage logic considers `image` as a set of local files, fully representing a certain timeline at given moment (identified with `disk_consistent_lsn`).
|
||||
//! Timeline can change its state, by adding more files on disk and advancing its `disk_consistent_lsn`: this happens after pageserver checkpointing and is followed
|
||||
//! by the storage upload, if enabled.
|
||||
//! When a certain image gets uploaded, the sync loop remembers the fact, preventing further reuploads of the same image state.
|
||||
//! When a certain checkpoint gets uploaded, the sync loop remembers the fact, preventing further reuploads of the same state.
|
||||
//! No files are deleted from either local or remote storage, only the missing ones locally/remotely get downloaded/uploaded, local metadata file will be overwritten
|
||||
//! when the newer timeline is downloaded.
|
||||
//! when the newer image is downloaded.
|
||||
//!
|
||||
//! Meanwhile, the loop inits the storage connection and checks the remote files stored.
|
||||
//! This is done once at startup only, relying on the fact that pageserver uses the storage alone (ergo, nobody else uploads the files to the storage but this server).
|
||||
//! Based on the remote image data, the storage sync logic queues image downloads, while accepting any potential upload tasks from pageserver and managing the tasks by their priority.
|
||||
//! On the image download, a [`crate::tenant_mgr::register_relish_download`] function is called to register the new image in pageserver, initializing all related threads and internal state.
|
||||
//! Based on the remote storage data, the sync logic queues timeline downloads, while accepting any potential upload tasks from pageserver and managing the tasks by their priority.
|
||||
//! On the timeline download, a [`crate::tenant_mgr::register_timeline_download`] function is called to register the new timeline in pageserver, initializing all related threads and internal state.
|
||||
//!
|
||||
//! When the pageserver terminates, the upload loop finishes a current image sync task (if any) and exits.
|
||||
//! To optimize S3 storage (and access), the sync loop compresses the checkpoint files before placing them to S3, and uncompresses them back, keeping track of timeline files and metadata.
|
||||
//! Also, the file remote file list is queried once only, at startup, to avoid possible extra costs and latency issues.
|
||||
//!
|
||||
//! When the pageserver terminates, the upload loop finishes a current sync task (if any) and exits.
|
||||
//!
|
||||
//! NOTES:
|
||||
//! * pageserver assumes it has exclusive write access to the remote storage. If supported, the way multiple pageservers can be separated in the same storage
|
||||
@@ -64,10 +67,6 @@
|
||||
//! 2. pageserver loads the timeline from disk for the first time
|
||||
//!
|
||||
//! * the uploads do not happen right after the upload registration: the sync loop might be occupied with other tasks, or tasks with bigger priority could be waiting already
|
||||
//!
|
||||
//! * all synchronization tasks (including the public API to register uploads and downloads and the sync queue management) happens on an image scale: a big set of remote files,
|
||||
//! enough to represent (and recover, if needed) a certain timeline state. On the contrary, all internal storage CRUD calls are made per reilsh file from those images.
|
||||
//! This way, the synchronization is able to download the image partially, if some state was synced before, but exposes correctly synced images only.
|
||||
|
||||
mod local_fs;
|
||||
mod rust_s3;
|
||||
|
||||
@@ -18,6 +18,9 @@ Current implementation
|
||||
* provides remote storage wrappers for AWS S3 and local FS
|
||||
* uploads layers, frozen by pageserver checkpoint thread
|
||||
* downloads and registers layers, found on the remote storage, but missing locally
|
||||
* uses compression when deals with files, for better S3 usage
|
||||
* maintains an index of what's stored remotely
|
||||
* evicts failing tasks and stops the corresponding timelines
|
||||
|
||||
No good optimisations or performance testing is done, the feature is disabled by default and gets polished over time.
|
||||
It's planned to deal with all questions that are currently on and prepare the feature to be enabled by default in cloud environments.
|
||||
@@ -27,21 +30,16 @@ It's planned to deal with all questions that are currently on and prepare the fe
|
||||
As mentioned, the backup component is rather new and under development currently, so not all things are done properly from the start.
|
||||
Here's the list of known compromises with comments:
|
||||
|
||||
* Remote storage model is the same as the `tenants/` directory contents of the pageserver's local workdir storage.
|
||||
This is relatively simple to implement, but may be costly to use in AWS S3: an initial data image contains ~782 relish files and a metadata file, ~31 MB combined.
|
||||
AWS charges per API call and for traffic either, layers are expected to be updated frequently, so this model most probably is ineffective.
|
||||
Additionally, pageservers might need to migrate images between tenants, which does not improve the situation.
|
||||
* Remote storage file model is currently a custom archive format, that's not possible to deserialize without a particular Rust code of ours (including `serde`).
|
||||
We also don't optimize the archivation and pack every timeline checkpoint separately, so the resulting blob's size that gets on S3 could be arbitrary.
|
||||
But, it's a single blob, which is way better than storing ~780 small files separately.
|
||||
|
||||
Storage sync API operates images when backing up or restoring a backup, so we're fluent to repack the layer contents the way we want to, which most probably will be done later.
|
||||
* Archive index restoration requires reading every blob's head.
|
||||
This could be avoided by a background thread/future storing the serialized index in the remote storage.
|
||||
|
||||
* no proper file comparison
|
||||
|
||||
Currently, every layer contains `Lsn` in their name, to map the data it holds against a certain DB state.
|
||||
Then the images with same ids and different `Lsn`'s are compared, files are considered equal if their local file paths are equal (for remote files, "local file path" is their download destination).
|
||||
No file contents assertion is done currently, but should be.
|
||||
AWS S3 returns file checksums during the `list` operation, so that can be used to ensure the backup consistency, but that needs further research and, since current pageserver impl also needs to deal with layer file checksums.
|
||||
|
||||
For now, due to this, we consider local workdir files as source of truth, not removing them ever and adjusting remote files instead, if image files mismatch.
|
||||
No file checksum assertion is done currently, but should be (AWS S3 returns file checksums during the `list` operation)
|
||||
|
||||
* sad rust-s3 api
|
||||
|
||||
@@ -60,7 +58,7 @@ Based on previous evaluation, even `rusoto-s3` could be a better choice over thi
|
||||
So far, we don't consider non-main images and don't adjust the remote storage based on GC thread loop results.
|
||||
Only checkpointer loop affects the remote storage.
|
||||
|
||||
* more layers should be downloaded on demand
|
||||
* more timelines should be downloaded on demand
|
||||
|
||||
Since we download and load remote layers into pageserver, there's a possibility a need for those layers' ancestors arise.
|
||||
Most probably, every downloaded image's ancestor is not present in locally too, but currently there's no logic for downloading such ancestors and their metadata,
|
||||
|
||||
@@ -355,7 +355,7 @@ mod pure_tests {
|
||||
.local_path(
|
||||
&storage_root.join(local_path.strip_prefix(&repo_harness.conf.workdir)?)
|
||||
)
|
||||
.expect("For a valid input, valid S3 info should be parsed"),
|
||||
.expect("For a valid input, valid local path should be parsed"),
|
||||
"Should be able to parse metadata out of the correctly named remote delta file"
|
||||
);
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
628
pageserver/src/remote_storage/storage_sync/compression.rs
Normal file
628
pageserver/src/remote_storage/storage_sync/compression.rs
Normal file
@@ -0,0 +1,628 @@
|
||||
//! A set of methods to asynchronously compress and uncompress a stream of data, without holding the entire data in memory.
|
||||
//!
|
||||
//! For that, both comporess and uncompress functions operate buffered streams (currently hardcoded sice of [`ARCHIVE_STREAM_BUFFER_SIZE_BYTES`]),
|
||||
//! not attempting to hold the entire archive in memory.
|
||||
//!
|
||||
//! With those ideas, the compression is done with <a href="https://datatracker.ietf.org/doc/html/rfc8878">zstd</a> streaming compression algorithm via the `async-compression` crate.
|
||||
//! The create does not contain any knobs to tweak the compression, but otherwise is one of the only ones that's both async and has an API to manage the part of an acrhive.
|
||||
//! Zstd was picked as the best algorithm among the ones available in the crate, after testing the initial timeline file compression.
|
||||
//!
|
||||
//! Archiving is almost agnostic to timeline file types, with an exception of the metadata file, that's currently distinguished in the [un]compression code.
|
||||
//!
|
||||
//! Archive structure:
|
||||
//! +----------------------------------------+
|
||||
//! | header | file_1, ..., file_k, metadata |
|
||||
//! +----------------------------------------+
|
||||
//!
|
||||
//! The archive consists of two separate files:
|
||||
//! * header, that contains all files names and their sizes and relative paths in the timeline directory
|
||||
//! Header is a Rust structure, serialized into bytes and compressed with zstd.
|
||||
//! * files part, that has metadata file as the last one, all compressed with zstd into a single binary blob
|
||||
//!
|
||||
//! Header offset is stored in the file name, along with the `disk_consistent_lsn` from the metadata file.
|
||||
//! See [`parse_archive_name`] and [`ARCHIVE_EXTENSION`] for the name details, example: `00000000016B9150-.zst_9732`.
|
||||
//! This way, the header could be retrieved without reading an entire archive file.
|
||||
//!
|
||||
//! The files are stored with the metadata as the last file, to reduce the risk of corrupting the metadata file.
|
||||
|
||||
use std::{
|
||||
collections::BTreeSet,
|
||||
future::Future,
|
||||
io::Cursor,
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, Context};
|
||||
use async_compression::tokio::bufread::{ZstdDecoder, ZstdEncoder};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::{
|
||||
fs,
|
||||
io::{self, AsyncReadExt, AsyncWriteExt},
|
||||
};
|
||||
use tracing::*;
|
||||
use zenith_utils::{bin_ser::BeSer, lsn::Lsn};
|
||||
|
||||
use crate::layered_repository::metadata::{TimelineMetadata, METADATA_FILE_NAME};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct ArchiveHeader {
|
||||
pub files: Vec<FileEntry>,
|
||||
// Metadata file name is known to the system, as its location relative to the timeline dir,
|
||||
// so no need to store anything but its size in bytes.
|
||||
pub metadata_file_size: u64,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
|
||||
pub struct FileEntry {
|
||||
/// Uncompressed file size, bytes.
|
||||
pub size: u64,
|
||||
/// A path, relative to the directory root, used when compressing the directory contents.
|
||||
pub subpath: String,
|
||||
}
|
||||
|
||||
const ARCHIVE_EXTENSION: &str = "-.zst_";
|
||||
const ARCHIVE_STREAM_BUFFER_SIZE_BYTES: usize = 4 * 1024 * 1024;
|
||||
|
||||
/// Streams an archive of files given into a stream target, defined by the closure.
|
||||
///
|
||||
/// The closure approach is picked for cases like S3, where we would need a name of the file before we can get a stream to write the bytes into.
|
||||
/// Current idea is to place the header size in the name of the file, to enable the fast partial remote file index restoration without actually reading remote storage file contents.
|
||||
///
|
||||
/// Performs the compression in multiple steps:
|
||||
/// * prepares an archive header, stripping the `source_dir` prefix from the `files`
|
||||
/// * generates the name of the archive
|
||||
/// * prepares archive producer future, knowing the header and the file list
|
||||
/// An `impl AsyncRead` and `impl AsyncWrite` pair of connected streams is created to implement the partial contents streaming.
|
||||
/// The writer end gets into the archive producer future, to put the header and a stream of compressed files.
|
||||
/// * prepares archive consumer future, by executing the provided closure
|
||||
/// The closure gets the reader end stream and the name of the file to cretate a future that would stream the file contents elsewhere.
|
||||
/// * runs and waits for both futures to complete
|
||||
/// * on a successful completion of both futures, hedader, its size and the user-defined consumer future return data is returned
|
||||
/// Due to the design above, the archive name and related data is visible inside the consumer future only, so it's possible to return the data,
|
||||
/// needed for future processing.
|
||||
pub async fn archive_files_as_stream<Cons, ConsRet, Fut>(
|
||||
source_dir: &Path,
|
||||
files: impl Iterator<Item = &PathBuf>,
|
||||
metadata: &TimelineMetadata,
|
||||
create_archive_consumer: Cons,
|
||||
) -> anyhow::Result<(ArchiveHeader, u64, ConsRet)>
|
||||
where
|
||||
Cons: FnOnce(Box<dyn io::AsyncRead + Unpin + Send + Sync + 'static>, String) -> Fut
|
||||
+ Send
|
||||
+ 'static,
|
||||
Fut: Future<Output = anyhow::Result<ConsRet>> + Send + 'static,
|
||||
ConsRet: Send + Sync + 'static,
|
||||
{
|
||||
let metadata_bytes = metadata
|
||||
.to_bytes()
|
||||
.context("Failed to create metadata bytes")?;
|
||||
let (archive_header, compressed_header_bytes) =
|
||||
prepare_header(source_dir, files, &metadata_bytes)
|
||||
.await
|
||||
.context("Failed to prepare file for archivation")?;
|
||||
|
||||
let header_size = compressed_header_bytes.len() as u64;
|
||||
let (write, read) = io::duplex(ARCHIVE_STREAM_BUFFER_SIZE_BYTES);
|
||||
let archive_filler = write_archive_contents(
|
||||
source_dir.to_path_buf(),
|
||||
archive_header.clone(),
|
||||
metadata_bytes,
|
||||
write,
|
||||
);
|
||||
let archive_name = archive_name(metadata.disk_consistent_lsn(), header_size);
|
||||
let archive_stream =
|
||||
Cursor::new(compressed_header_bytes).chain(ZstdEncoder::new(io::BufReader::new(read)));
|
||||
|
||||
let (archive_creation_result, archive_upload_result) = tokio::join!(
|
||||
tokio::spawn(archive_filler),
|
||||
tokio::spawn(async move {
|
||||
create_archive_consumer(Box::new(archive_stream), archive_name).await
|
||||
})
|
||||
);
|
||||
archive_creation_result
|
||||
.context("Failed to spawn archive creation future")?
|
||||
.context("Failed to create an archive")?;
|
||||
let upload_return_value = archive_upload_result
|
||||
.context("Failed to spawn archive upload future")?
|
||||
.context("Failed to upload the archive")?;
|
||||
|
||||
Ok((archive_header, header_size, upload_return_value))
|
||||
}
|
||||
|
||||
/// Similar to [`archive_files_as_stream`], creates a pair of streams to uncompress the 2nd part of the archive,
|
||||
/// that contains files and is located after the header.
|
||||
/// S3 allows downloading partial file contents for a given file key (i.e. name), to accomodate this retrieval,
|
||||
/// a closure is used.
|
||||
/// Same concepts with two concurrent futures, user-defined closure, future and return value apply here, but the
|
||||
/// consumer and the receiver ends are swapped, since the uncompression happens.
|
||||
pub async fn uncompress_file_stream_with_index<Prod, ProdRet, Fut>(
|
||||
destination_dir: PathBuf,
|
||||
files_to_skip: Arc<BTreeSet<PathBuf>>,
|
||||
disk_consistent_lsn: Lsn,
|
||||
header: ArchiveHeader,
|
||||
header_size: u64,
|
||||
create_archive_file_part: Prod,
|
||||
) -> anyhow::Result<ProdRet>
|
||||
where
|
||||
Prod: FnOnce(Box<dyn io::AsyncWrite + Unpin + Send + Sync + 'static>, String) -> Fut
|
||||
+ Send
|
||||
+ 'static,
|
||||
Fut: Future<Output = anyhow::Result<ProdRet>> + Send + 'static,
|
||||
ProdRet: Send + Sync + 'static,
|
||||
{
|
||||
let (write, mut read) = io::duplex(ARCHIVE_STREAM_BUFFER_SIZE_BYTES);
|
||||
let archive_name = archive_name(disk_consistent_lsn, header_size);
|
||||
|
||||
let (archive_download_result, archive_uncompress_result) = tokio::join!(
|
||||
tokio::spawn(async move { create_archive_file_part(Box::new(write), archive_name).await }),
|
||||
tokio::spawn(async move {
|
||||
uncompress_with_header(&files_to_skip, &destination_dir, header, &mut read).await
|
||||
})
|
||||
);
|
||||
|
||||
let download_value = archive_download_result
|
||||
.context("Failed to spawn archive download future")?
|
||||
.context("Failed to download an archive")?;
|
||||
archive_uncompress_result
|
||||
.context("Failed to spawn archive uncompress future")?
|
||||
.context("Failed to uncompress the archive")?;
|
||||
|
||||
Ok(download_value)
|
||||
}
|
||||
|
||||
/// Reads archive header from the stream given:
|
||||
/// * parses the file name to get the header size
|
||||
/// * reads the exact amount of bytes
|
||||
/// * uncompresses and deserializes those
|
||||
pub async fn read_archive_header<A: io::AsyncRead + Send + Sync + Unpin>(
|
||||
archive_name: &str,
|
||||
from: &mut A,
|
||||
) -> anyhow::Result<ArchiveHeader> {
|
||||
let (_, header_size) = parse_archive_name(Path::new(archive_name))?;
|
||||
|
||||
let mut compressed_header_bytes = vec![0; header_size as usize];
|
||||
from.read_exact(&mut compressed_header_bytes)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to read header header from the archive {}",
|
||||
archive_name
|
||||
)
|
||||
})?;
|
||||
|
||||
let mut header_bytes = Vec::new();
|
||||
ZstdDecoder::new(io::BufReader::new(compressed_header_bytes.as_slice()))
|
||||
.read_to_end(&mut header_bytes)
|
||||
.await
|
||||
.context("Failed to decompress a header from the archive")?;
|
||||
header_bytes
|
||||
.flush()
|
||||
.await
|
||||
.context("Failed to decompress a header from the archive")?;
|
||||
|
||||
Ok(ArchiveHeader::des(&header_bytes)
|
||||
.context("Failed to deserialize a header from the archive")?)
|
||||
}
|
||||
|
||||
/// Reads the archive metadata out of the archive name:
|
||||
/// * `disk_consistent_lsn` of the checkpoint that was archived
|
||||
/// * size of the archive header
|
||||
pub fn parse_archive_name(archive_path: &Path) -> anyhow::Result<(Lsn, u64)> {
|
||||
let archive_name = archive_path
|
||||
.file_name()
|
||||
.ok_or_else(|| anyhow!("Archive '{}' has no file name", archive_path.display()))?
|
||||
.to_string_lossy();
|
||||
let (lsn_str, header_size_str) =
|
||||
archive_name.rsplit_once(ARCHIVE_EXTENSION).ok_or_else(|| {
|
||||
anyhow!(
|
||||
"Archive '{}' has incorrect extension, expected to contain '{}'",
|
||||
archive_path.display(),
|
||||
ARCHIVE_EXTENSION
|
||||
)
|
||||
})?;
|
||||
let disk_consistent_lsn = Lsn::from_hex(lsn_str).with_context(|| {
|
||||
format!(
|
||||
"Archive '{}' has an invalid disk consistent lsn in its extension",
|
||||
archive_path.display(),
|
||||
)
|
||||
})?;
|
||||
let header_size = header_size_str.parse::<u64>().with_context(|| {
|
||||
format!(
|
||||
"Archive '{}' has an invalid a header offset number in its extension",
|
||||
archive_path.display(),
|
||||
)
|
||||
})?;
|
||||
Ok((disk_consistent_lsn, header_size))
|
||||
}
|
||||
|
||||
fn archive_name(disk_consistent_lsn: Lsn, header_size: u64) -> String {
|
||||
let archive_name = format!(
|
||||
"{:016X}{ARCHIVE_EXTENSION}{}",
|
||||
u64::from(disk_consistent_lsn),
|
||||
header_size,
|
||||
ARCHIVE_EXTENSION = ARCHIVE_EXTENSION,
|
||||
);
|
||||
archive_name
|
||||
}
|
||||
|
||||
async fn uncompress_with_header(
|
||||
files_to_skip: &BTreeSet<PathBuf>,
|
||||
destination_dir: &Path,
|
||||
header: ArchiveHeader,
|
||||
archive_after_header: impl io::AsyncRead + Send + Sync + Unpin,
|
||||
) -> anyhow::Result<()> {
|
||||
debug!("Uncompressing archive into {}", destination_dir.display());
|
||||
let mut archive = ZstdDecoder::new(io::BufReader::new(archive_after_header));
|
||||
|
||||
if !destination_dir.exists() {
|
||||
fs::create_dir_all(&destination_dir)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to create target directory at {}",
|
||||
destination_dir.display()
|
||||
)
|
||||
})?;
|
||||
} else if !destination_dir.is_dir() {
|
||||
bail!(
|
||||
"Destination path '{}' is not a valid directory",
|
||||
destination_dir.display()
|
||||
);
|
||||
}
|
||||
debug!("Will extract {} files from the archive", header.files.len());
|
||||
for entry in header.files {
|
||||
uncompress_entry(
|
||||
&mut archive,
|
||||
&entry.subpath,
|
||||
entry.size,
|
||||
files_to_skip,
|
||||
destination_dir,
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("Failed to uncompress archive entry {:?}", entry))?;
|
||||
}
|
||||
uncompress_entry(
|
||||
&mut archive,
|
||||
METADATA_FILE_NAME,
|
||||
header.metadata_file_size,
|
||||
files_to_skip,
|
||||
destination_dir,
|
||||
)
|
||||
.await
|
||||
.context("Failed to uncompress the metadata entry")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn uncompress_entry(
|
||||
archive: &mut ZstdDecoder<io::BufReader<impl io::AsyncRead + Send + Sync + Unpin>>,
|
||||
entry_subpath: &str,
|
||||
entry_size: u64,
|
||||
files_to_skip: &BTreeSet<PathBuf>,
|
||||
destination_dir: &Path,
|
||||
) -> anyhow::Result<()> {
|
||||
let destination_path = destination_dir.join(entry_subpath);
|
||||
if let Some(parent) = destination_path.parent() {
|
||||
fs::create_dir_all(parent).await.with_context(|| {
|
||||
format!(
|
||||
"Failed to create parent directory for {}",
|
||||
destination_path.display()
|
||||
)
|
||||
})?;
|
||||
};
|
||||
|
||||
if files_to_skip.contains(destination_path.as_path()) {
|
||||
debug!("Skipping {}", destination_path.display());
|
||||
read_n_bytes(entry_size, archive, &mut io::sink())
|
||||
.await
|
||||
.context("Failed to skip bytes in the archive")?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut destination =
|
||||
io::BufWriter::new(fs::File::create(&destination_path).await.with_context(|| {
|
||||
format!(
|
||||
"Failed to open file {} for extraction",
|
||||
destination_path.display()
|
||||
)
|
||||
})?);
|
||||
read_n_bytes(entry_size, archive, &mut destination)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to write extracted archive contents into file {}",
|
||||
destination_path.display()
|
||||
)
|
||||
})?;
|
||||
destination
|
||||
.flush()
|
||||
.await
|
||||
.context("Failed to flush the streaming archive bytes")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_archive_contents(
|
||||
source_dir: PathBuf,
|
||||
header: ArchiveHeader,
|
||||
metadata_bytes: Vec<u8>,
|
||||
mut archive_input: io::DuplexStream,
|
||||
) -> anyhow::Result<()> {
|
||||
debug!("Starting writing files into archive");
|
||||
for file_entry in header.files {
|
||||
let path = source_dir.join(&file_entry.subpath);
|
||||
let mut source_file =
|
||||
io::BufReader::new(fs::File::open(&path).await.with_context(|| {
|
||||
format!(
|
||||
"Failed to open file for archiving to path {}",
|
||||
path.display()
|
||||
)
|
||||
})?);
|
||||
let bytes_written = io::copy(&mut source_file, &mut archive_input)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to open add a file into archive, file path {}",
|
||||
path.display()
|
||||
)
|
||||
})?;
|
||||
ensure!(
|
||||
file_entry.size == bytes_written,
|
||||
"File {} was written to the archive incompletely",
|
||||
path.display()
|
||||
);
|
||||
trace!(
|
||||
"Added file '{}' ({} bytes) into the archive",
|
||||
path.display(),
|
||||
bytes_written
|
||||
);
|
||||
}
|
||||
let metadata_bytes_written = io::copy(&mut metadata_bytes.as_slice(), &mut archive_input)
|
||||
.await
|
||||
.with_context(|| "Failed to add metadata into the archive")?;
|
||||
ensure!(
|
||||
header.metadata_file_size == metadata_bytes_written,
|
||||
"Metadata file was written to the archive incompletely",
|
||||
);
|
||||
|
||||
archive_input
|
||||
.shutdown()
|
||||
.await
|
||||
.context("Failed to finalize the archive")?;
|
||||
debug!("Successfully streamed all files into the archive");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn prepare_header(
|
||||
source_dir: &Path,
|
||||
files: impl Iterator<Item = &PathBuf>,
|
||||
metadata_bytes: &[u8],
|
||||
) -> anyhow::Result<(ArchiveHeader, Vec<u8>)> {
|
||||
let mut archive_files = Vec::new();
|
||||
for file_path in files {
|
||||
let file_metadata = fs::metadata(file_path).await.with_context(|| {
|
||||
format!(
|
||||
"Failed to read metadata during archive indexing for {}",
|
||||
file_path.display()
|
||||
)
|
||||
})?;
|
||||
ensure!(
|
||||
file_metadata.is_file(),
|
||||
"Archive indexed path {} is not a file",
|
||||
file_path.display()
|
||||
);
|
||||
|
||||
if file_path.file_name().and_then(|name| name.to_str()) != Some(METADATA_FILE_NAME) {
|
||||
let entry = FileEntry {
|
||||
subpath: file_path
|
||||
.strip_prefix(&source_dir)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"File '{}' does not belong to pageserver workspace",
|
||||
file_path.display()
|
||||
)
|
||||
})?
|
||||
.to_string_lossy()
|
||||
.to_string(),
|
||||
size: file_metadata.len(),
|
||||
};
|
||||
archive_files.push(entry);
|
||||
}
|
||||
}
|
||||
|
||||
let header = ArchiveHeader {
|
||||
files: archive_files,
|
||||
metadata_file_size: metadata_bytes.len() as u64,
|
||||
};
|
||||
|
||||
debug!("Appending a header for {} files", header.files.len());
|
||||
let header_bytes = header.ser().context("Failed to serialize a header")?;
|
||||
debug!("Header bytes len {}", header_bytes.len());
|
||||
let mut compressed_header_bytes = Vec::new();
|
||||
ZstdEncoder::new(io::BufReader::new(header_bytes.as_slice()))
|
||||
.read_to_end(&mut compressed_header_bytes)
|
||||
.await
|
||||
.context("Failed to compress header bytes")?;
|
||||
debug!(
|
||||
"Compressed header bytes len {}",
|
||||
compressed_header_bytes.len()
|
||||
);
|
||||
Ok((header, compressed_header_bytes))
|
||||
}
|
||||
|
||||
async fn read_n_bytes(
|
||||
n: u64,
|
||||
from: &mut (impl io::AsyncRead + Send + Sync + Unpin),
|
||||
into: &mut (impl io::AsyncWrite + Send + Sync + Unpin),
|
||||
) -> anyhow::Result<()> {
|
||||
let mut bytes_unread = n;
|
||||
while bytes_unread > 0 {
|
||||
let mut buf = vec![0; bytes_unread as usize];
|
||||
let bytes_read = from.read(&mut buf).await?;
|
||||
if bytes_read == 0 {
|
||||
break;
|
||||
}
|
||||
into.write_all(&buf[0..bytes_read]).await?;
|
||||
bytes_unread -= bytes_read as u64;
|
||||
}
|
||||
ensure!(
|
||||
bytes_unread == 0,
|
||||
"Failed to read exactly {} bytes from the input, bytes unread: {}",
|
||||
n,
|
||||
bytes_unread,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use tokio::{fs, io::AsyncSeekExt};
|
||||
|
||||
use crate::repository::repo_harness::{RepoHarness, TIMELINE_ID};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn compress_and_uncompress() -> anyhow::Result<()> {
|
||||
let repo_harness = RepoHarness::create("compress_and_uncompress")?;
|
||||
let timeline_dir = repo_harness.timeline_path(&TIMELINE_ID);
|
||||
init_directory(
|
||||
&timeline_dir,
|
||||
vec![
|
||||
("first", "first_contents"),
|
||||
("second", "second_contents"),
|
||||
(METADATA_FILE_NAME, "wrong_metadata"),
|
||||
],
|
||||
)
|
||||
.await?;
|
||||
let timeline_files = list_file_paths_with_contents(&timeline_dir).await?;
|
||||
assert_eq!(
|
||||
timeline_files,
|
||||
vec![
|
||||
(
|
||||
timeline_dir.join("first"),
|
||||
FileContents::Text("first_contents".to_string())
|
||||
),
|
||||
(
|
||||
timeline_dir.join(METADATA_FILE_NAME),
|
||||
FileContents::Text("wrong_metadata".to_string())
|
||||
),
|
||||
(
|
||||
timeline_dir.join("second"),
|
||||
FileContents::Text("second_contents".to_string())
|
||||
),
|
||||
],
|
||||
"Initial timeline contents should contain two normal files and a wrong metadata file"
|
||||
);
|
||||
|
||||
let metadata = TimelineMetadata::new(Lsn(0x30), None, None, Lsn(0), Lsn(0), Lsn(0));
|
||||
let paths_to_archive = timeline_files
|
||||
.into_iter()
|
||||
.map(|(path, _)| path)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let tempdir = tempfile::tempdir()?;
|
||||
let base_path = tempdir.path().to_path_buf();
|
||||
let (header, header_size, archive_target) = archive_files_as_stream(
|
||||
&timeline_dir,
|
||||
paths_to_archive.iter(),
|
||||
&metadata,
|
||||
move |mut archive_streamer, archive_name| async move {
|
||||
let archive_target = base_path.join(&archive_name);
|
||||
let mut archive_file = fs::File::create(&archive_target).await?;
|
||||
io::copy(&mut archive_streamer, &mut archive_file).await?;
|
||||
Ok(archive_target)
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut file = fs::File::open(&archive_target).await?;
|
||||
file.seek(io::SeekFrom::Start(header_size)).await?;
|
||||
let target_dir = tempdir.path().join("extracted");
|
||||
uncompress_with_header(&BTreeSet::new(), &target_dir, header, file).await?;
|
||||
|
||||
let extracted_files = list_file_paths_with_contents(&target_dir).await?;
|
||||
|
||||
assert_eq!(
|
||||
extracted_files,
|
||||
vec![
|
||||
(
|
||||
target_dir.join("first"),
|
||||
FileContents::Text("first_contents".to_string())
|
||||
),
|
||||
(
|
||||
target_dir.join(METADATA_FILE_NAME),
|
||||
FileContents::Binary(metadata.to_bytes()?)
|
||||
),
|
||||
(
|
||||
target_dir.join("second"),
|
||||
FileContents::Text("second_contents".to_string())
|
||||
),
|
||||
],
|
||||
"Extracted files should contain all local timeline files besides its metadata, which should be taken from the arguments"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn init_directory(
|
||||
root: &Path,
|
||||
files_with_contents: Vec<(&str, &str)>,
|
||||
) -> anyhow::Result<()> {
|
||||
fs::create_dir_all(root).await?;
|
||||
for (file_name, contents) in files_with_contents {
|
||||
fs::File::create(root.join(file_name))
|
||||
.await?
|
||||
.write_all(contents.as_bytes())
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, PartialOrd, Ord)]
|
||||
enum FileContents {
|
||||
Text(String),
|
||||
Binary(Vec<u8>),
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for FileContents {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Text(text) => f.debug_tuple("Text").field(text).finish(),
|
||||
Self::Binary(bytes) => f
|
||||
.debug_tuple("Binary")
|
||||
.field(&format!("{} bytes", bytes.len()))
|
||||
.finish(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn list_file_paths_with_contents(
|
||||
root: &Path,
|
||||
) -> anyhow::Result<Vec<(PathBuf, FileContents)>> {
|
||||
let mut file_paths = Vec::new();
|
||||
|
||||
let mut dir_listings = vec![fs::read_dir(root).await?];
|
||||
while let Some(mut dir_listing) = dir_listings.pop() {
|
||||
while let Some(entry) = dir_listing.next_entry().await? {
|
||||
let entry_path = entry.path();
|
||||
if entry_path.is_file() {
|
||||
let contents = match String::from_utf8(fs::read(&entry_path).await?) {
|
||||
Ok(text) => FileContents::Text(text),
|
||||
Err(e) => FileContents::Binary(e.into_bytes()),
|
||||
};
|
||||
file_paths.push((entry_path, contents));
|
||||
} else if entry_path.is_dir() {
|
||||
dir_listings.push(fs::read_dir(entry_path).await?);
|
||||
} else {
|
||||
info!(
|
||||
"Skipping path '{}' as it's not a file or a directory",
|
||||
entry_path.display()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
file_paths.sort();
|
||||
Ok(file_paths)
|
||||
}
|
||||
}
|
||||
375
pageserver/src/remote_storage/storage_sync/index.rs
Normal file
375
pageserver/src/remote_storage/storage_sync/index.rs
Normal file
@@ -0,0 +1,375 @@
|
||||
//! In-memory index to track the timeline files in the remote strorage's archives.
|
||||
//! Able to restore itself from the storage archive data and reconstruct archive indices on demand.
|
||||
//!
|
||||
//! The index is intended to be portable, so deliberately does not store any local paths inside.
|
||||
//! This way in the future, the index could be restored fast from its serialized stored form.
|
||||
|
||||
use std::{
|
||||
collections::{BTreeMap, BTreeSet, HashMap},
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, Context};
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use tracing::error;
|
||||
use zenith_utils::{
|
||||
lsn::Lsn,
|
||||
zid::{ZTenantId, ZTimelineId},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
layered_repository::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME},
|
||||
remote_storage::{
|
||||
storage_sync::compression::{parse_archive_name, FileEntry},
|
||||
RemoteStorage, TimelineSyncId,
|
||||
},
|
||||
};
|
||||
|
||||
use super::compression::{read_archive_header, ArchiveHeader};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
|
||||
pub struct ArchiveId(pub(super) Lsn);
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
|
||||
struct FileId(ArchiveId, ArchiveEntryNumber);
|
||||
|
||||
type ArchiveEntryNumber = usize;
|
||||
|
||||
/// All archives and files in them, representing a certain timeline.
|
||||
/// Uses file and archive IDs to reference those without ownership issues.
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
pub struct RemoteTimeline {
|
||||
timeline_files: BTreeMap<FileId, FileEntry>,
|
||||
checkpoint_archives: BTreeMap<ArchiveId, CheckpointArchive>,
|
||||
}
|
||||
|
||||
/// Archive metadata, enough to restore a header with the timeline data.
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
pub struct CheckpointArchive {
|
||||
disk_consistent_lsn: Lsn,
|
||||
metadata_file_size: u64,
|
||||
files: BTreeSet<FileId>,
|
||||
archive_header_size: u64,
|
||||
}
|
||||
|
||||
impl CheckpointArchive {
|
||||
pub fn disk_consistent_lsn(&self) -> Lsn {
|
||||
self.disk_consistent_lsn
|
||||
}
|
||||
}
|
||||
|
||||
impl RemoteTimeline {
|
||||
pub fn empty() -> Self {
|
||||
Self {
|
||||
timeline_files: BTreeMap::new(),
|
||||
checkpoint_archives: BTreeMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Lists all relish files in the given remote timeline. Omits the metadata file.
|
||||
pub fn stored_files(&self, timeline_dir: &Path) -> BTreeSet<PathBuf> {
|
||||
self.timeline_files
|
||||
.values()
|
||||
.map(|file_entry| timeline_dir.join(&file_entry.subpath))
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn stored_archives(&self) -> Vec<ArchiveId> {
|
||||
self.checkpoint_archives.keys().copied().collect()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn latest_disk_consistent_lsn(&self) -> Option<Lsn> {
|
||||
self.checkpoint_archives
|
||||
.keys()
|
||||
.last()
|
||||
.map(|archive_id| archive_id.0)
|
||||
}
|
||||
|
||||
pub fn contains_archive(&self, disk_consistent_lsn: Lsn) -> bool {
|
||||
self.checkpoint_archives
|
||||
.contains_key(&ArchiveId(disk_consistent_lsn))
|
||||
}
|
||||
|
||||
pub fn archive_data(&self, archive_id: ArchiveId) -> Option<&CheckpointArchive> {
|
||||
self.checkpoint_archives.get(&archive_id)
|
||||
}
|
||||
|
||||
/// Restores a header of a certain remote archive from the memory data.
|
||||
/// Returns the header and its compressed size in the archive, both can be used to uncompress that archive.
|
||||
pub fn restore_header(&self, archive_id: ArchiveId) -> anyhow::Result<(ArchiveHeader, u64)> {
|
||||
let archive = self
|
||||
.checkpoint_archives
|
||||
.get(&archive_id)
|
||||
.ok_or_else(|| anyhow!("Archive {:?} not found", archive_id))?;
|
||||
|
||||
let mut header_files = Vec::with_capacity(archive.files.len());
|
||||
for (expected_archive_position, archive_file) in archive.files.iter().enumerate() {
|
||||
let &FileId(archive_id, archive_position) = archive_file;
|
||||
ensure!(
|
||||
expected_archive_position == archive_position,
|
||||
"Archive header is corrupt, file # {} from archive {:?} header is missing",
|
||||
expected_archive_position,
|
||||
archive_id,
|
||||
);
|
||||
|
||||
let timeline_file = self.timeline_files.get(archive_file).ok_or_else(|| {
|
||||
anyhow!(
|
||||
"File with id {:?} not found for archive {:?}",
|
||||
archive_file,
|
||||
archive_id
|
||||
)
|
||||
})?;
|
||||
header_files.push(timeline_file.clone());
|
||||
}
|
||||
|
||||
Ok((
|
||||
ArchiveHeader {
|
||||
files: header_files,
|
||||
metadata_file_size: archive.metadata_file_size,
|
||||
},
|
||||
archive.archive_header_size,
|
||||
))
|
||||
}
|
||||
|
||||
/// Updates (creates, if necessary) the data about a certain archive contents.
|
||||
pub fn set_archive_contents(
|
||||
&mut self,
|
||||
disk_consistent_lsn: Lsn,
|
||||
header: ArchiveHeader,
|
||||
header_size: u64,
|
||||
) {
|
||||
let archive_id = ArchiveId(disk_consistent_lsn);
|
||||
let mut common_archive_files = BTreeSet::new();
|
||||
for (file_index, file_entry) in header.files.into_iter().enumerate() {
|
||||
let file_id = FileId(archive_id, file_index);
|
||||
self.timeline_files.insert(file_id, file_entry);
|
||||
common_archive_files.insert(file_id);
|
||||
}
|
||||
|
||||
let metadata_file_size = header.metadata_file_size;
|
||||
self.checkpoint_archives
|
||||
.entry(archive_id)
|
||||
.or_insert_with(|| CheckpointArchive {
|
||||
metadata_file_size,
|
||||
files: BTreeSet::new(),
|
||||
archive_header_size: header_size,
|
||||
disk_consistent_lsn,
|
||||
})
|
||||
.files
|
||||
.extend(common_archive_files.into_iter());
|
||||
}
|
||||
}
|
||||
|
||||
/// Reads remote storage file list, parses the data from the file paths and uses it to read every archive's header for every timeline,
|
||||
/// thus restoring the file list for every timeline.
|
||||
/// Due to the way headers are stored, S3 api for accessing file byte range is used, so we don't have to download an entire archive for its listing.
|
||||
pub(super) async fn reconstruct_from_storage<
|
||||
P: std::fmt::Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
|
||||
>(
|
||||
storage: &S,
|
||||
) -> anyhow::Result<HashMap<TimelineSyncId, RemoteTimeline>> {
|
||||
let mut index = HashMap::<TimelineSyncId, RemoteTimeline>::new();
|
||||
for (sync_id, remote_archives) in collect_archives(storage).await? {
|
||||
let mut archive_header_downloads = remote_archives
|
||||
.into_iter()
|
||||
.map(|(archive_id, (archive, remote_path))| async move {
|
||||
let mut header_buf = std::io::Cursor::new(Vec::new());
|
||||
storage
|
||||
.download_range(&remote_path, 0, Some(archive.header_size), &mut header_buf)
|
||||
.await
|
||||
.map_err(|e| (e, archive_id))?;
|
||||
let header_buf = header_buf.into_inner();
|
||||
let header = read_archive_header(&archive.archive_name, &mut header_buf.as_slice())
|
||||
.await
|
||||
.map_err(|e| (e, archive_id))?;
|
||||
Ok::<_, (anyhow::Error, ArchiveId)>((archive_id, archive.header_size, header))
|
||||
})
|
||||
.collect::<FuturesUnordered<_>>();
|
||||
|
||||
while let Some(header_data) = archive_header_downloads.next().await {
|
||||
match header_data {
|
||||
Ok((archive_id, header_size, header)) => {
|
||||
index
|
||||
.entry(sync_id)
|
||||
.or_insert_with(RemoteTimeline::empty)
|
||||
.set_archive_contents(archive_id.0, header, header_size);
|
||||
}
|
||||
Err((e, archive_id)) => {
|
||||
bail!(
|
||||
"Failed to download archive header for tenant {}, timeline {}, archive for Lsn {}: {}",
|
||||
sync_id.0, sync_id.1, archive_id.0,
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(index)
|
||||
}
|
||||
|
||||
async fn collect_archives<
|
||||
P: std::fmt::Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
|
||||
>(
|
||||
storage: &S,
|
||||
) -> anyhow::Result<HashMap<TimelineSyncId, BTreeMap<ArchiveId, (ArchiveDescription, P)>>> {
|
||||
let mut remote_archives =
|
||||
HashMap::<TimelineSyncId, BTreeMap<ArchiveId, (ArchiveDescription, P)>>::new();
|
||||
for (local_path, remote_path) in storage
|
||||
.list()
|
||||
.await
|
||||
.context("Failed to list remote storage files")?
|
||||
.into_iter()
|
||||
.map(|remote_path| (storage.local_path(&remote_path), remote_path))
|
||||
{
|
||||
match local_path.and_then(|local_path| parse_archive_description(&local_path)) {
|
||||
Ok((sync_id, archive_description)) => {
|
||||
remote_archives.entry(sync_id).or_default().insert(
|
||||
ArchiveId(archive_description.disk_consistent_lsn),
|
||||
(archive_description, remote_path),
|
||||
);
|
||||
}
|
||||
Err(e) => error!(
|
||||
"Failed to parse archive description from path '{:?}', reason: {:#}",
|
||||
remote_path, e
|
||||
),
|
||||
}
|
||||
}
|
||||
Ok(remote_archives)
|
||||
}
|
||||
|
||||
struct ArchiveDescription {
|
||||
header_size: u64,
|
||||
disk_consistent_lsn: Lsn,
|
||||
archive_name: String,
|
||||
}
|
||||
|
||||
fn parse_archive_description(
|
||||
archive_path: &Path,
|
||||
) -> anyhow::Result<(TimelineSyncId, ArchiveDescription)> {
|
||||
let (disk_consistent_lsn, header_size) =
|
||||
parse_archive_name(archive_path).with_context(|| {
|
||||
format!(
|
||||
"Failed to parse timeline id from archive name '{}'",
|
||||
archive_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
let mut segments = archive_path
|
||||
.iter()
|
||||
.skip_while(|&segment| segment != TENANTS_SEGMENT_NAME);
|
||||
let tenants_segment = segments.next().ok_or_else(|| {
|
||||
anyhow!(
|
||||
"Found no '{}' segment in the archive path '{}'",
|
||||
TENANTS_SEGMENT_NAME,
|
||||
archive_path.display()
|
||||
)
|
||||
})?;
|
||||
ensure!(
|
||||
tenants_segment == TENANTS_SEGMENT_NAME,
|
||||
"Failed to extract '{}' segment from archive path '{}'",
|
||||
TENANTS_SEGMENT_NAME,
|
||||
archive_path.display()
|
||||
);
|
||||
let tenant_id = segments
|
||||
.next()
|
||||
.ok_or_else(|| {
|
||||
anyhow!(
|
||||
"Found no tenant id in the archive path '{}'",
|
||||
archive_path.display()
|
||||
)
|
||||
})?
|
||||
.to_string_lossy()
|
||||
.parse::<ZTenantId>()
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to parse tenant id from archive path '{}'",
|
||||
archive_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
let timelines_segment = segments.next().ok_or_else(|| {
|
||||
anyhow!(
|
||||
"Found no '{}' segment in the archive path '{}'",
|
||||
TIMELINES_SEGMENT_NAME,
|
||||
archive_path.display()
|
||||
)
|
||||
})?;
|
||||
ensure!(
|
||||
timelines_segment == TIMELINES_SEGMENT_NAME,
|
||||
"Failed to extract '{}' segment from archive path '{}'",
|
||||
TIMELINES_SEGMENT_NAME,
|
||||
archive_path.display()
|
||||
);
|
||||
let timeline_id = segments
|
||||
.next()
|
||||
.ok_or_else(|| {
|
||||
anyhow!(
|
||||
"Found no timeline id in the archive path '{}'",
|
||||
archive_path.display()
|
||||
)
|
||||
})?
|
||||
.to_string_lossy()
|
||||
.parse::<ZTimelineId>()
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to parse timeline id from archive path '{}'",
|
||||
archive_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
let archive_name = archive_path
|
||||
.file_name()
|
||||
.ok_or_else(|| anyhow!("Archive '{}' has no file name", archive_path.display()))?
|
||||
.to_string_lossy()
|
||||
.to_string();
|
||||
Ok((
|
||||
TimelineSyncId(tenant_id, timeline_id),
|
||||
ArchiveDescription {
|
||||
header_size,
|
||||
disk_consistent_lsn,
|
||||
archive_name,
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn header_restoration_preserves_file_order() {
|
||||
let header = ArchiveHeader {
|
||||
files: vec![
|
||||
FileEntry {
|
||||
size: 5,
|
||||
subpath: "one".to_string(),
|
||||
},
|
||||
FileEntry {
|
||||
size: 1,
|
||||
subpath: "two".to_string(),
|
||||
},
|
||||
FileEntry {
|
||||
size: 222,
|
||||
subpath: "zero".to_string(),
|
||||
},
|
||||
],
|
||||
metadata_file_size: 5,
|
||||
};
|
||||
|
||||
let lsn = Lsn(1);
|
||||
let mut remote_timeline = RemoteTimeline::empty();
|
||||
remote_timeline.set_archive_contents(lsn, header.clone(), 15);
|
||||
|
||||
let (restored_header, _) = remote_timeline
|
||||
.restore_header(ArchiveId(lsn))
|
||||
.expect("Should be able to restore header from a valid remote timeline");
|
||||
|
||||
assert_eq!(
|
||||
header, restored_header,
|
||||
"Header restoration should preserve file order"
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -49,15 +49,15 @@ pub enum TenantState {
|
||||
Stopping,
|
||||
}
|
||||
|
||||
/// A remote storage timeline synchronization event, that needs another step
|
||||
/// to be fully completed.
|
||||
/// A remote storage timeline synchronization event, that needs another registration step
|
||||
/// inside the manager to be fully completed.
|
||||
#[derive(Debug)]
|
||||
pub enum PostTimelineSyncStep {
|
||||
pub enum TimelineRegistration {
|
||||
/// The timeline cannot be synchronized anymore due to some sync issues.
|
||||
/// Needs to be removed from pageserver, to avoid further data diverging.
|
||||
Evict,
|
||||
/// A new timeline got downloaded and needs to be loaded into pageserver.
|
||||
RegisterDownload,
|
||||
Download,
|
||||
}
|
||||
|
||||
impl fmt::Display for TenantState {
|
||||
@@ -117,9 +117,10 @@ fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) {
|
||||
|
||||
pub fn perform_post_timeline_sync_steps(
|
||||
conf: &'static PageServerConf,
|
||||
post_sync_steps: HashMap<(ZTenantId, ZTimelineId), PostTimelineSyncStep>,
|
||||
post_sync_steps: HashMap<(ZTenantId, ZTimelineId), TimelineRegistration>,
|
||||
) {
|
||||
if post_sync_steps.is_empty() {
|
||||
debug!("no post-sync steps to perform");
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -148,7 +149,7 @@ pub fn perform_post_timeline_sync_steps(
|
||||
|
||||
for ((tenant_id, timeline_id), post_sync_step) in post_sync_steps {
|
||||
match post_sync_step {
|
||||
PostTimelineSyncStep::Evict => {
|
||||
TimelineRegistration::Evict => {
|
||||
if let Err(e) = get_repository_for_tenant(tenant_id)
|
||||
.and_then(|repo| repo.unload_timeline(timeline_id))
|
||||
{
|
||||
@@ -158,7 +159,14 @@ pub fn perform_post_timeline_sync_steps(
|
||||
)
|
||||
}
|
||||
}
|
||||
PostTimelineSyncStep::RegisterDownload => {
|
||||
TimelineRegistration::Download => {
|
||||
// TODO remove later, when branching is added to remote storage sync
|
||||
for missing_path in [conf.branches_path(&tenant_id), conf.tags_path(&tenant_id)] {
|
||||
if !missing_path.exists() {
|
||||
fs::create_dir_all(&missing_path).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
// init repo updates Tenant state
|
||||
init_repo(conf, tenant_id);
|
||||
let new_repo = get_repository_for_tenant(tenant_id).unwrap();
|
||||
|
||||
Reference in New Issue
Block a user