From de37f982dba67eae85b64c48259a0a36dbcc0e09 Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Wed, 4 May 2022 17:06:44 +0300 Subject: [PATCH] Share the remote storage as a crate --- Cargo.lock | 71 +-- control_plane/src/storage.rs | 17 +- docs/settings.md | 20 +- libs/remote_storage/Cargo.toml | 20 + libs/remote_storage/src/lib.rs | 232 ++++++++++ .../remote_storage/src}/local_fs.rs | 186 ++++---- .../remote_storage/src}/s3_bucket.rs | 147 +++---- pageserver/Cargo.toml | 8 +- pageserver/README.md | 6 +- pageserver/src/config.rs | 120 +---- pageserver/src/http/routes.rs | 29 +- pageserver/src/layered_repository.rs | 14 +- pageserver/src/lib.rs | 2 +- pageserver/src/remote_storage.rs | 412 ------------------ pageserver/src/repository.rs | 2 +- .../src/{remote_storage => }/storage_sync.rs | 373 +++++++++++++--- .../storage_sync/download.rs | 54 +-- .../storage_sync/index.rs | 0 .../storage_sync/upload.rs | 53 ++- pageserver/src/tenant_mgr.rs | 5 +- pageserver/src/timelines.rs | 2 +- safekeeper/Cargo.toml | 3 +- safekeeper/src/s3_offload.rs | 107 ++--- test_runner/fixtures/zenith_fixtures.py | 29 +- workspace_hack/Cargo.toml | 6 + 25 files changed, 961 insertions(+), 957 deletions(-) create mode 100644 libs/remote_storage/Cargo.toml create mode 100644 libs/remote_storage/src/lib.rs rename {pageserver/src/remote_storage => libs/remote_storage/src}/local_fs.rs (81%) rename {pageserver/src/remote_storage => libs/remote_storage/src}/s3_bucket.rs (74%) delete mode 100644 pageserver/src/remote_storage.rs rename pageserver/src/{remote_storage => }/storage_sync.rs (77%) rename pageserver/src/{remote_storage => }/storage_sync/download.rs (93%) rename pageserver/src/{remote_storage => }/storage_sync/index.rs (100%) rename pageserver/src/{remote_storage => }/storage_sync/upload.rs (93%) diff --git a/Cargo.lock b/Cargo.lock index ac40a2931f..148517a777 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -48,9 +48,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.57" +version = "1.0.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08f9b8508dccb7687a1d6c4ce66b2b0ecef467c94667de27d8d7fe1f8d2a9cdc" +checksum = "94a45b455c14666b85fc40a019e8ab9eb75e3a124e05494f5397122bc9eb06e0" dependencies = [ "backtrace", ] @@ -1700,9 +1700,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.10.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9" +checksum = "da32515d9f6e6e489d7bc9d84c71b060db7247dc035bbe44eac88cf87486d8d5" [[package]] name = "oorandom" @@ -1763,7 +1763,6 @@ name = "pageserver" version = "0.1.0" dependencies = [ "anyhow", - "async-trait", "byteorder", "bytes", "chrono", @@ -1791,8 +1790,7 @@ dependencies = [ "pprof", "rand", "regex", - "rusoto_core", - "rusoto_s3", + "remote_storage", "scopeguard", "serde", "serde_json", @@ -1804,7 +1802,6 @@ dependencies = [ "tokio", "tokio-postgres", "tokio-stream", - "tokio-util 0.7.0", "toml_edit", "tracing", "url", @@ -2104,9 +2101,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.10.1" +version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a07b0857a71a8cb765763950499cae2413c3f9cede1133478c43600d9e146890" +checksum = "bc03e116981ff7d8da8e5c220e374587b98d294af7ba7dd7fda761158f00086f" dependencies = [ "bytes", "prost-derive", @@ -2114,9 +2111,9 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.10.1" +version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "120fbe7988713f39d780a58cf1a7ef0d7ef66c6d87e5aa3438940c05357929f4" +checksum = "65a1118354442de7feb8a2a76f3d80ef01426bd45542c8c1fdffca41a758f846" dependencies = [ "bytes", "cfg-if", @@ -2347,6 +2344,23 @@ version = "0.6.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b" +[[package]] +name = "remote_storage" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "rusoto_core", + "rusoto_s3", + "serde", + "serde_json", + "tempfile", + "tokio", + "tokio-util 0.7.0", + "tracing", + "workspace_hack", +] + [[package]] name = "remove_dir_all" version = "0.5.3" @@ -2446,9 +2460,9 @@ dependencies = [ [[package]] name = "rusoto_core" -version = "0.47.0" +version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b4f000e8934c1b4f70adde180056812e7ea6b1a247952db8ee98c94cd3116cc" +checksum = "1db30db44ea73551326269adcf7a2169428a054f14faf9e1768f2163494f2fa2" dependencies = [ "async-trait", "base64", @@ -2471,9 +2485,9 @@ dependencies = [ [[package]] name = "rusoto_credential" -version = "0.47.0" +version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a46b67db7bb66f5541e44db22b0a02fed59c9603e146db3a9e633272d3bac2f" +checksum = "ee0a6c13db5aad6047b6a44ef023dbbc21a056b6dab5be3b79ce4283d5c02d05" dependencies = [ "async-trait", "chrono", @@ -2489,9 +2503,9 @@ dependencies = [ [[package]] name = "rusoto_s3" -version = "0.47.0" +version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "048c2fe811a823ad5a9acc976e8bf4f1d910df719dcf44b15c3e96c5b7a51027" +checksum = "7aae4677183411f6b0b412d66194ef5403293917d66e70ab118f07cc24c5b14d" dependencies = [ "async-trait", "bytes", @@ -2502,9 +2516,9 @@ dependencies = [ [[package]] name = "rusoto_signature" -version = "0.47.0" +version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6264e93384b90a747758bcc82079711eacf2e755c3a8b5091687b5349d870bcc" +checksum = "a5ae95491c8b4847931e291b151127eccd6ff8ca13f33603eb3d0035ecb05272" dependencies = [ "base64", "bytes", @@ -2611,8 +2625,7 @@ dependencies = [ "postgres-protocol", "postgres_ffi", "regex", - "rusoto_core", - "rusoto_s3", + "remote_storage", "serde", "serde_json", "serde_with", @@ -3275,9 +3288,9 @@ dependencies = [ [[package]] name = "tonic" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30fb54bf1e446f44d870d260d99957e7d11fb9d0a0f5bd1a662ad1411cc103f9" +checksum = "5be9d60db39854b30b835107500cf0aca0b0d14d6e1c3de124217c23a29c2ddb" dependencies = [ "async-stream", "async-trait", @@ -3307,9 +3320,9 @@ dependencies = [ [[package]] name = "tonic-build" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c03447cdc9eaf8feffb6412dcb27baf2db11669a6c4789f29da799aabfb99547" +checksum = "d9263bf4c9bfaae7317c1c2faf7f18491d2fe476f70c414b73bf5d445b00ffa1" dependencies = [ "prettyplease", "proc-macro2", @@ -3805,7 +3818,13 @@ dependencies = [ "clap 2.34.0", "either", "fail", + "futures-channel", + "futures-task", + "futures-util", + "generic-array", "hashbrown", + "hex", + "hyper", "indexmap", "itoa 0.4.8", "libc", diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index 3a63bf6960..adb924d430 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -186,8 +186,6 @@ impl PageServerNode { ); io::stdout().flush().unwrap(); - let mut cmd = Command::new(self.env.pageserver_bin()?); - let repo_path = self.repo_path(); let mut args = vec!["-D", repo_path.to_str().unwrap()]; @@ -195,9 +193,11 @@ impl PageServerNode { args.extend(["-c", config_override]); } - fill_rust_env_vars(cmd.args(&args).arg("--daemonize")); + let mut cmd = Command::new(self.env.pageserver_bin()?); + let mut filled_cmd = fill_rust_env_vars(cmd.args(&args).arg("--daemonize")); + filled_cmd = fill_aws_secrets_vars(filled_cmd); - if !cmd.status()?.success() { + if !filled_cmd.status()?.success() { bail!( "Pageserver failed to start. See '{}' for details.", self.repo_path().join("pageserver.log").display() @@ -457,3 +457,12 @@ impl PageServerNode { Ok(timeline_info_response) } } + +fn fill_aws_secrets_vars(mut cmd: &mut Command) -> &mut Command { + for env_key in ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"] { + if let Ok(value) = std::env::var(env_key) { + cmd = cmd.env(env_key, value); + } + } + cmd +} diff --git a/docs/settings.md b/docs/settings.md index b3925528cd..017d349bb6 100644 --- a/docs/settings.md +++ b/docs/settings.md @@ -6,7 +6,6 @@ If there's no such file during `init` phase of the server, it creates the file i There's a possibility to pass an arbitrary config value to the pageserver binary as an argument: such values override the values in the config file, if any are specified for the same key and get into the final config during init phase. - ### Config example ```toml @@ -35,9 +34,9 @@ Yet, it validates the config values it can (e.g. postgres install dir) and error Note the `[remote_storage]` section: it's a [table](https://toml.io/en/v1.0.0#table) in TOML specification and -* either has to be placed in the config after the table-less values such as `initial_superuser_name = 'zenith_admin'` +- either has to be placed in the config after the table-less values such as `initial_superuser_name = 'zenith_admin'` -* or can be placed anywhere if rewritten in identical form as [inline table](https://toml.io/en/v1.0.0#inline-table): `remote_storage = {foo = 2}` +- or can be placed anywhere if rewritten in identical form as [inline table](https://toml.io/en/v1.0.0#inline-table): `remote_storage = {foo = 2}` ### Config values @@ -57,7 +56,7 @@ but it will trigger a checkpoint operation to get it back below the limit. `checkpoint_distance` also determines how much WAL needs to be kept -durable in the safekeeper. The safekeeper must have capacity to hold +durable in the safekeeper. The safekeeper must have capacity to hold this much WAL, with some headroom, otherwise you can get stuck in a situation where the safekeeper is full and stops accepting new WAL, but the pageserver is not flushing out and releasing the space in the @@ -72,7 +71,7 @@ The unit is # of bytes. Every `compaction_period` seconds, the page server checks if maintenance operations, like compaction, are needed on the layer -files. Default is 1 s, which should be fine. +files. Default is 1 s, which should be fine. #### compaction_target_size @@ -163,16 +162,12 @@ bucket_region = 'eu-north-1' # Optional, pageserver uses entire bucket if the prefix is not specified. prefix_in_bucket = '/some/prefix/' -# Access key to connect to the bucket ("login" part of the credentials) -access_key_id = 'SOMEKEYAAAAASADSAH*#' - -# Secret access key to connect to the bucket ("password" part of the credentials) -secret_access_key = 'SOMEsEcReTsd292v' - # S3 API query limit to avoid getting errors/throttling from AWS. concurrency_limit = 100 ``` +If no IAM bucket access is used during the remote storage usage, use the `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables to set the access credentials. + ###### General remote storage configuration Pagesever allows only one remote storage configured concurrently and errors if parameters from multiple different remote configurations are used. @@ -183,13 +178,12 @@ Besides, there are parameters common for all types of remote storage that can be ```toml [remote_storage] # Max number of concurrent timeline synchronized (layers uploaded or downloaded) with the remote storage at the same time. -max_concurrent_timelines_sync = 50 +max_concurrent_syncs = 50 # Max number of errors a single task can have before it's considered failed and not attempted to run anymore. max_sync_errors = 10 ``` - ## safekeeper TODO diff --git a/libs/remote_storage/Cargo.toml b/libs/remote_storage/Cargo.toml new file mode 100644 index 0000000000..291f6e50ac --- /dev/null +++ b/libs/remote_storage/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "remote_storage" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow = { version = "1.0", features = ["backtrace"] } +tokio = { version = "1.17", features = ["sync", "macros", "fs", "io-util"] } +tokio-util = { version = "0.7", features = ["io"] } +tracing = "0.1.27" +rusoto_core = "0.48" +rusoto_s3 = "0.48" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1" +async-trait = "0.1" + +workspace_hack = { version = "0.1", path = "../../workspace_hack" } + +[dev-dependencies] +tempfile = "3.2" diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs new file mode 100644 index 0000000000..9bbb855dd5 --- /dev/null +++ b/libs/remote_storage/src/lib.rs @@ -0,0 +1,232 @@ +//! A set of generic storage abstractions for the page server to use when backing up and restoring its state from the external storage. +//! No other modules from this tree are supposed to be used directly by the external code. +//! +//! [`RemoteStorage`] trait a CRUD-like generic abstraction to use for adapting external storages with a few implementations: +//! * [`local_fs`] allows to use local file system as an external storage +//! * [`s3_bucket`] uses AWS S3 bucket as an external storage +//! +mod local_fs; +mod s3_bucket; + +use std::{ + borrow::Cow, + collections::HashMap, + ffi::OsStr, + num::{NonZeroU32, NonZeroUsize}, + path::{Path, PathBuf}, +}; + +use anyhow::Context; +use tokio::io; +use tracing::info; + +pub use self::{ + local_fs::LocalFs, + s3_bucket::{S3Bucket, S3ObjectKey}, +}; + +/// How many different timelines can be processed simultaneously when synchronizing layers with the remote storage. +/// During regular work, pageserver produces one layer file per timeline checkpoint, with bursts of concurrency +/// during start (where local and remote timelines are compared and initial sync tasks are scheduled) and timeline attach. +/// Both cases may trigger timeline download, that might download a lot of layers. This concurrency is limited by the clients internally, if needed. +pub const DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS: usize = 50; +pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10; +/// Currently, sync happens with AWS S3, that has two limits on requests per second: +/// ~200 RPS for IAM services +/// https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html +/// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests +/// https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/ +pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100; + +/// Storage (potentially remote) API to manage its state. +/// This storage tries to be unaware of any layered repository context, +/// providing basic CRUD operations for storage files. +#[async_trait::async_trait] +pub trait RemoteStorage: Send + Sync { + /// A way to uniquely reference a file in the remote storage. + type RemoteObjectId; + + /// Attempts to derive the storage path out of the local path, if the latter is correct. + fn remote_object_id(&self, local_path: &Path) -> anyhow::Result; + + /// Gets the download path of the given storage file. + fn local_path(&self, remote_object_id: &Self::RemoteObjectId) -> anyhow::Result; + + /// Lists all items the storage has right now. + async fn list(&self) -> anyhow::Result>; + + /// Streams the local file contents into remote into the remote storage entry. + async fn upload( + &self, + from: impl io::AsyncRead + Unpin + Send + Sync + 'static, + // S3 PUT request requires the content length to be specified, + // otherwise it starts to fail with the concurrent connection count increasing. + from_size_bytes: usize, + to: &Self::RemoteObjectId, + metadata: Option, + ) -> anyhow::Result<()>; + + /// Streams the remote storage entry contents into the buffered writer given, returns the filled writer. + /// Returns the metadata, if any was stored with the file previously. + async fn download( + &self, + from: &Self::RemoteObjectId, + to: &mut (impl io::AsyncWrite + Unpin + Send + Sync), + ) -> anyhow::Result>; + + /// Streams a given byte range of the remote storage entry contents into the buffered writer given, returns the filled writer. + /// Returns the metadata, if any was stored with the file previously. + async fn download_byte_range( + &self, + from: &Self::RemoteObjectId, + start_inclusive: u64, + end_exclusive: Option, + to: &mut (impl io::AsyncWrite + Unpin + Send + Sync), + ) -> anyhow::Result>; + + async fn delete(&self, path: &Self::RemoteObjectId) -> anyhow::Result<()>; +} + +/// TODO kb +pub enum GenericRemoteStorage { + Local(LocalFs), + S3(S3Bucket), +} + +impl GenericRemoteStorage { + pub fn new( + working_directory: PathBuf, + storage_config: &RemoteStorageConfig, + ) -> anyhow::Result { + match &storage_config.storage { + RemoteStorageKind::LocalFs(root) => { + info!("Using fs root '{}' as a remote storage", root.display()); + LocalFs::new(root.clone(), working_directory).map(GenericRemoteStorage::Local) + } + RemoteStorageKind::AwsS3(s3_config) => { + info!("Using s3 bucket '{}' in region '{}' as a remote storage, prefix in bucket: '{:?}', bucket endpoint: '{:?}'", + s3_config.bucket_name, s3_config.bucket_region, s3_config.prefix_in_bucket, s3_config.endpoint); + S3Bucket::new(s3_config, working_directory).map(GenericRemoteStorage::S3) + } + } + } +} + +/// Extra set of key-value pairs that contain arbitrary metadata about the storage entry. +/// Immutable, cannot be changed once the file is created. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct StorageMetadata(HashMap); + +fn strip_path_prefix<'a>(prefix: &'a Path, path: &'a Path) -> anyhow::Result<&'a Path> { + if prefix == path { + anyhow::bail!( + "Prefix and the path are equal, cannot strip: '{}'", + prefix.display() + ) + } else { + path.strip_prefix(prefix).with_context(|| { + format!( + "Path '{}' is not prefixed with '{}'", + path.display(), + prefix.display(), + ) + }) + } +} + +/// External backup storage configuration, enough for creating a client for that storage. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RemoteStorageConfig { + /// Max allowed number of concurrent sync operations between the API user and the remote storage. + pub max_concurrent_syncs: NonZeroUsize, + /// Max allowed errors before the sync task is considered failed and evicted. + pub max_sync_errors: NonZeroU32, + /// The storage connection configuration. + pub storage: RemoteStorageKind, +} + +/// A kind of a remote storage to connect to, with its connection configuration. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum RemoteStorageKind { + /// Storage based on local file system. + /// Specify a root folder to place all stored files into. + LocalFs(PathBuf), + /// AWS S3 based storage, storing all files in the S3 bucket + /// specified by the config + AwsS3(S3Config), +} + +/// AWS S3 bucket coordinates and access credentials to manage the bucket contents (read and write). +#[derive(Clone, PartialEq, Eq)] +pub struct S3Config { + /// Name of the bucket to connect to. + pub bucket_name: String, + /// The region where the bucket is located at. + pub bucket_region: String, + /// A "subfolder" in the bucket, to use the same bucket separately by multiple remote storage users at once. + pub prefix_in_bucket: Option, + /// A base URL to send S3 requests to. + /// By default, the endpoint is derived from a region name, assuming it's + /// an AWS S3 region name, erroring on wrong region name. + /// Endpoint provides a way to support other S3 flavors and their regions. + /// + /// Example: `http://127.0.0.1:5000` + pub endpoint: Option, + /// AWS S3 has various limits on its API calls, we need not to exceed those. + /// See [`DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT`] for more details. + pub concurrency_limit: NonZeroUsize, +} + +impl std::fmt::Debug for S3Config { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("S3Config") + .field("bucket_name", &self.bucket_name) + .field("bucket_region", &self.bucket_region) + .field("prefix_in_bucket", &self.prefix_in_bucket) + .field("concurrency_limit", &self.concurrency_limit) + .finish() + } +} + +pub fn path_with_suffix_extension(original_path: impl AsRef, suffix: &str) -> PathBuf { + let new_extension = match original_path + .as_ref() + .extension() + .map(OsStr::to_string_lossy) + { + Some(extension) => Cow::Owned(format!("{extension}.{suffix}")), + None => Cow::Borrowed(suffix), + }; + original_path + .as_ref() + .with_extension(new_extension.as_ref()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_path_with_suffix_extension() { + let p = PathBuf::from("/foo/bar"); + assert_eq!( + &path_with_suffix_extension(&p, "temp").to_string_lossy(), + "/foo/bar.temp" + ); + let p = PathBuf::from("/foo/bar"); + assert_eq!( + &path_with_suffix_extension(&p, "temp.temp").to_string_lossy(), + "/foo/bar.temp.temp" + ); + let p = PathBuf::from("/foo/bar.baz"); + assert_eq!( + &path_with_suffix_extension(&p, "temp.temp").to_string_lossy(), + "/foo/bar.baz.temp.temp" + ); + let p = PathBuf::from("/foo/bar.baz"); + assert_eq!( + &path_with_suffix_extension(&p, ".temp").to_string_lossy(), + "/foo/bar.baz..temp" + ); + } +} diff --git a/pageserver/src/remote_storage/local_fs.rs b/libs/remote_storage/src/local_fs.rs similarity index 81% rename from pageserver/src/remote_storage/local_fs.rs rename to libs/remote_storage/src/local_fs.rs index 6772a4fbd6..50243352ee 100644 --- a/pageserver/src/remote_storage/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -1,7 +1,7 @@ //! Local filesystem acting as a remote storage. -//! Multiple pageservers can use the same "storage" of this kind by using different storage roots. +//! Multiple API users can use the same "storage" of this kind by using different storage roots. //! -//! This storage used in pageserver tests, but can also be used in cases when a certain persistent +//! This storage used in tests, but can also be used in cases when a certain persistent //! volume is mounted to the local FS. use std::{ @@ -17,18 +17,18 @@ use tokio::{ }; use tracing::*; -use crate::remote_storage::storage_sync::path_with_suffix_extension; +use crate::path_with_suffix_extension; use super::{strip_path_prefix, RemoteStorage, StorageMetadata}; pub struct LocalFs { - pageserver_workdir: &'static Path, - root: PathBuf, + working_directory: PathBuf, + storage_root: PathBuf, } impl LocalFs { /// Attempts to create local FS storage, along with its root directory. - pub fn new(root: PathBuf, pageserver_workdir: &'static Path) -> anyhow::Result { + pub fn new(root: PathBuf, working_directory: PathBuf) -> anyhow::Result { if !root.exists() { std::fs::create_dir_all(&root).with_context(|| { format!( @@ -38,15 +38,15 @@ impl LocalFs { })?; } Ok(Self { - pageserver_workdir, - root, + working_directory, + storage_root: root, }) } fn resolve_in_storage(&self, path: &Path) -> anyhow::Result { if path.is_relative() { - Ok(self.root.join(path)) - } else if path.starts_with(&self.root) { + Ok(self.storage_root.join(path)) + } else if path.starts_with(&self.storage_root) { Ok(path.to_path_buf()) } else { bail!( @@ -85,30 +85,30 @@ impl LocalFs { #[async_trait::async_trait] impl RemoteStorage for LocalFs { - type StoragePath = PathBuf; + type RemoteObjectId = PathBuf; - fn storage_path(&self, local_path: &Path) -> anyhow::Result { - Ok(self.root.join( - strip_path_prefix(self.pageserver_workdir, local_path) + fn remote_object_id(&self, local_path: &Path) -> anyhow::Result { + Ok(self.storage_root.join( + strip_path_prefix(&self.working_directory, local_path) .context("local path does not belong to this storage")?, )) } - fn local_path(&self, storage_path: &Self::StoragePath) -> anyhow::Result { - let relative_path = strip_path_prefix(&self.root, storage_path) + fn local_path(&self, storage_path: &Self::RemoteObjectId) -> anyhow::Result { + let relative_path = strip_path_prefix(&self.storage_root, storage_path) .context("local path does not belong to this storage")?; - Ok(self.pageserver_workdir.join(relative_path)) + Ok(self.working_directory.join(relative_path)) } - async fn list(&self) -> anyhow::Result> { - get_all_files(&self.root).await + async fn list(&self) -> anyhow::Result> { + get_all_files(&self.storage_root).await } async fn upload( &self, from: impl io::AsyncRead + Unpin + Send + Sync + 'static, from_size_bytes: usize, - to: &Self::StoragePath, + to: &Self::RemoteObjectId, metadata: Option, ) -> anyhow::Result<()> { let target_file_path = self.resolve_in_storage(to)?; @@ -194,7 +194,7 @@ impl RemoteStorage for LocalFs { async fn download( &self, - from: &Self::StoragePath, + from: &Self::RemoteObjectId, to: &mut (impl io::AsyncWrite + Unpin + Send + Sync), ) -> anyhow::Result> { let file_path = self.resolve_in_storage(from)?; @@ -229,9 +229,9 @@ impl RemoteStorage for LocalFs { } } - async fn download_range( + async fn download_byte_range( &self, - from: &Self::StoragePath, + from: &Self::RemoteObjectId, start_inclusive: u64, end_exclusive: Option, to: &mut (impl io::AsyncWrite + Unpin + Send + Sync), @@ -288,7 +288,7 @@ impl RemoteStorage for LocalFs { } } - async fn delete(&self, path: &Self::StoragePath) -> anyhow::Result<()> { + async fn delete(&self, path: &Self::RemoteObjectId) -> anyhow::Result<()> { let file_path = self.resolve_in_storage(path)?; if file_path.exists() && file_path.is_file() { Ok(fs::remove_file(file_path).await?) @@ -354,29 +354,30 @@ async fn create_target_directory(target_file_path: &Path) -> anyhow::Result<()> #[cfg(test)] mod pure_tests { - use crate::{ - layered_repository::metadata::METADATA_FILE_NAME, - repository::repo_harness::{RepoHarness, TIMELINE_ID}, - }; + use tempfile::tempdir; use super::*; #[test] fn storage_path_positive() -> anyhow::Result<()> { - let repo_harness = RepoHarness::create("storage_path_positive")?; + let workdir = tempdir()?.path().to_owned(); + let storage_root = PathBuf::from("somewhere").join("else"); let storage = LocalFs { - pageserver_workdir: &repo_harness.conf.workdir, - root: storage_root.clone(), + working_directory: workdir.clone(), + storage_root: storage_root.clone(), }; - let local_path = repo_harness.timeline_path(&TIMELINE_ID).join("file_name"); - let expected_path = storage_root.join(local_path.strip_prefix(&repo_harness.conf.workdir)?); + let local_path = workdir + .join("timelines") + .join("some_timeline") + .join("file_name"); + let expected_path = storage_root.join(local_path.strip_prefix(&workdir)?); assert_eq!( expected_path, - storage.storage_path(&local_path).expect("Matching path should map to storage path normally"), - "File paths from pageserver workdir should be stored in local fs storage with the same path they have relative to the workdir" + storage.remote_object_id(&local_path).expect("Matching path should map to storage path normally"), + "File paths from workdir should be stored in local fs storage with the same path they have relative to the workdir" ); Ok(()) @@ -386,7 +387,7 @@ mod pure_tests { fn storage_path_negatives() -> anyhow::Result<()> { #[track_caller] fn storage_path_error(storage: &LocalFs, mismatching_path: &Path) -> String { - match storage.storage_path(mismatching_path) { + match storage.remote_object_id(mismatching_path) { Ok(wrong_path) => panic!( "Expected path '{}' to error, but got storage path: {:?}", mismatching_path.display(), @@ -396,16 +397,16 @@ mod pure_tests { } } - let repo_harness = RepoHarness::create("storage_path_negatives")?; + let workdir = tempdir()?.path().to_owned(); let storage_root = PathBuf::from("somewhere").join("else"); let storage = LocalFs { - pageserver_workdir: &repo_harness.conf.workdir, - root: storage_root, + working_directory: workdir.clone(), + storage_root, }; - let error_string = storage_path_error(&storage, &repo_harness.conf.workdir); + let error_string = storage_path_error(&storage, &workdir); assert!(error_string.contains("does not belong to this storage")); - assert!(error_string.contains(repo_harness.conf.workdir.to_str().unwrap())); + assert!(error_string.contains(workdir.to_str().unwrap())); let mismatching_path_str = "/something/else"; let error_message = storage_path_error(&storage, Path::new(mismatching_path_str)); @@ -414,7 +415,7 @@ mod pure_tests { "Error should mention wrong path" ); assert!( - error_message.contains(repo_harness.conf.workdir.to_str().unwrap()), + error_message.contains(workdir.to_str().unwrap()), "Error should mention server workdir" ); assert!(error_message.contains("does not belong to this storage")); @@ -424,29 +425,28 @@ mod pure_tests { #[test] fn local_path_positive() -> anyhow::Result<()> { - let repo_harness = RepoHarness::create("local_path_positive")?; + let workdir = tempdir()?.path().to_owned(); let storage_root = PathBuf::from("somewhere").join("else"); let storage = LocalFs { - pageserver_workdir: &repo_harness.conf.workdir, - root: storage_root.clone(), + working_directory: workdir.clone(), + storage_root: storage_root.clone(), }; let name = "not a metadata"; - let local_path = repo_harness.timeline_path(&TIMELINE_ID).join(name); + let local_path = workdir.join("timelines").join("some_timeline").join(name); assert_eq!( local_path, storage - .local_path( - &storage_root.join(local_path.strip_prefix(&repo_harness.conf.workdir)?) - ) + .local_path(&storage_root.join(local_path.strip_prefix(&workdir)?)) .expect("For a valid input, valid local path should be parsed"), "Should be able to parse metadata out of the correctly named remote delta file" ); - let local_metadata_path = repo_harness - .timeline_path(&TIMELINE_ID) - .join(METADATA_FILE_NAME); - let remote_metadata_path = storage.storage_path(&local_metadata_path)?; + let local_metadata_path = workdir + .join("timelines") + .join("some_timeline") + .join("metadata"); + let remote_metadata_path = storage.remote_object_id(&local_metadata_path)?; assert_eq!( local_metadata_path, storage @@ -472,11 +472,10 @@ mod pure_tests { } } - let repo_harness = RepoHarness::create("local_path_negatives")?; let storage_root = PathBuf::from("somewhere").join("else"); let storage = LocalFs { - pageserver_workdir: &repo_harness.conf.workdir, - root: storage_root, + working_directory: tempdir()?.path().to_owned(), + storage_root, }; let totally_wrong_path = "wrong_wrong_wrong"; @@ -488,16 +487,19 @@ mod pure_tests { #[test] fn download_destination_matches_original_path() -> anyhow::Result<()> { - let repo_harness = RepoHarness::create("download_destination_matches_original_path")?; - let original_path = repo_harness.timeline_path(&TIMELINE_ID).join("some name"); + let workdir = tempdir()?.path().to_owned(); + let original_path = workdir + .join("timelines") + .join("some_timeline") + .join("some name"); let storage_root = PathBuf::from("somewhere").join("else"); let dummy_storage = LocalFs { - pageserver_workdir: &repo_harness.conf.workdir, - root: storage_root, + working_directory: workdir, + storage_root, }; - let storage_path = dummy_storage.storage_path(&original_path)?; + let storage_path = dummy_storage.remote_object_id(&original_path)?; let download_destination = dummy_storage.local_path(&storage_path)?; assert_eq!( @@ -512,18 +514,17 @@ mod pure_tests { #[cfg(test)] mod fs_tests { use super::*; - use crate::repository::repo_harness::{RepoHarness, TIMELINE_ID}; use std::{collections::HashMap, io::Write}; use tempfile::tempdir; #[tokio::test] async fn upload_file() -> anyhow::Result<()> { - let repo_harness = RepoHarness::create("upload_file")?; + let workdir = tempdir()?.path().to_owned(); let storage = create_storage()?; let (file, size) = create_file_for_upload( - &storage.pageserver_workdir.join("whatever"), + &storage.working_directory.join("whatever"), "whatever_contents", ) .await?; @@ -538,14 +539,14 @@ mod fs_tests { } assert!(storage.list().await?.is_empty()); - let target_path_1 = upload_dummy_file(&repo_harness, &storage, "upload_1", None).await?; + let target_path_1 = upload_dummy_file(&workdir, &storage, "upload_1", None).await?; assert_eq!( storage.list().await?, vec![target_path_1.clone()], "Should list a single file after first upload" ); - let target_path_2 = upload_dummy_file(&repo_harness, &storage, "upload_2", None).await?; + let target_path_2 = upload_dummy_file(&workdir, &storage, "upload_2", None).await?; assert_eq!( list_files_sorted(&storage).await?, vec![target_path_1.clone(), target_path_2.clone()], @@ -556,17 +557,16 @@ mod fs_tests { } fn create_storage() -> anyhow::Result { - let pageserver_workdir = Box::leak(Box::new(tempdir()?.path().to_owned())); - let storage = LocalFs::new(tempdir()?.path().to_owned(), pageserver_workdir)?; - Ok(storage) + LocalFs::new(tempdir()?.path().to_owned(), tempdir()?.path().to_owned()) } #[tokio::test] async fn download_file() -> anyhow::Result<()> { - let repo_harness = RepoHarness::create("download_file")?; + let workdir = tempdir()?.path().to_owned(); + let storage = create_storage()?; let upload_name = "upload_1"; - let upload_target = upload_dummy_file(&repo_harness, &storage, upload_name, None).await?; + let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?; let mut content_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new())); let metadata = storage.download(&upload_target, &mut content_bytes).await?; @@ -597,14 +597,15 @@ mod fs_tests { #[tokio::test] async fn download_file_range_positive() -> anyhow::Result<()> { - let repo_harness = RepoHarness::create("download_file_range_positive")?; + let workdir = tempdir()?.path().to_owned(); + let storage = create_storage()?; let upload_name = "upload_1"; - let upload_target = upload_dummy_file(&repo_harness, &storage, upload_name, None).await?; + let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?; let mut full_range_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new())); let metadata = storage - .download_range(&upload_target, 0, None, &mut full_range_bytes) + .download_byte_range(&upload_target, 0, None, &mut full_range_bytes) .await?; assert!( metadata.is_none(), @@ -620,7 +621,7 @@ mod fs_tests { let mut zero_range_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new())); let same_byte = 1_000_000_000; let metadata = storage - .download_range( + .download_byte_range( &upload_target, same_byte, Some(same_byte + 1), // exclusive end @@ -642,7 +643,7 @@ mod fs_tests { let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new())); let metadata = storage - .download_range( + .download_byte_range( &upload_target, 0, Some(first_part_local.len() as u64), @@ -664,7 +665,7 @@ mod fs_tests { let mut second_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new())); let metadata = storage - .download_range( + .download_byte_range( &upload_target, first_part_local.len() as u64, Some((first_part_local.len() + second_part_local.len()) as u64), @@ -689,16 +690,17 @@ mod fs_tests { #[tokio::test] async fn download_file_range_negative() -> anyhow::Result<()> { - let repo_harness = RepoHarness::create("download_file_range_negative")?; + let workdir = tempdir()?.path().to_owned(); + let storage = create_storage()?; let upload_name = "upload_1"; - let upload_target = upload_dummy_file(&repo_harness, &storage, upload_name, None).await?; + let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?; let start = 10000; let end = 234; assert!(start > end, "Should test an incorrect range"); match storage - .download_range(&upload_target, start, Some(end), &mut io::sink()) + .download_byte_range(&upload_target, start, Some(end), &mut io::sink()) .await { Ok(_) => panic!("Should not allow downloading wrong ranges"), @@ -712,7 +714,7 @@ mod fs_tests { let non_existing_path = PathBuf::from("somewhere").join("else"); match storage - .download_range(&non_existing_path, 1, Some(3), &mut io::sink()) + .download_byte_range(&non_existing_path, 1, Some(3), &mut io::sink()) .await { Ok(_) => panic!("Should not allow downloading non-existing storage file ranges"), @@ -727,10 +729,11 @@ mod fs_tests { #[tokio::test] async fn delete_file() -> anyhow::Result<()> { - let repo_harness = RepoHarness::create("delete_file")?; + let workdir = tempdir()?.path().to_owned(); + let storage = create_storage()?; let upload_name = "upload_1"; - let upload_target = upload_dummy_file(&repo_harness, &storage, upload_name, None).await?; + let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?; storage.delete(&upload_target).await?; assert!(storage.list().await?.is_empty()); @@ -748,7 +751,8 @@ mod fs_tests { #[tokio::test] async fn file_with_metadata() -> anyhow::Result<()> { - let repo_harness = RepoHarness::create("download_file")?; + let workdir = tempdir()?.path().to_owned(); + let storage = create_storage()?; let upload_name = "upload_1"; let metadata = StorageMetadata(HashMap::from([ @@ -756,7 +760,7 @@ mod fs_tests { ("two".to_string(), "2".to_string()), ])); let upload_target = - upload_dummy_file(&repo_harness, &storage, upload_name, Some(metadata.clone())).await?; + upload_dummy_file(&workdir, &storage, upload_name, Some(metadata.clone())).await?; let mut content_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new())); let full_download_metadata = storage.download(&upload_target, &mut content_bytes).await?; @@ -780,7 +784,7 @@ mod fs_tests { let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new())); let partial_download_metadata = storage - .download_range( + .download_byte_range( &upload_target, 0, Some(first_part_local.len() as u64), @@ -805,16 +809,16 @@ mod fs_tests { } async fn upload_dummy_file( - harness: &RepoHarness<'_>, + workdir: &Path, storage: &LocalFs, name: &str, metadata: Option, ) -> anyhow::Result { - let timeline_path = harness.timeline_path(&TIMELINE_ID); - let relative_timeline_path = timeline_path.strip_prefix(&harness.conf.workdir)?; - let storage_path = storage.root.join(relative_timeline_path).join(name); + let timeline_path = workdir.join("timelines").join("some_timeline"); + let relative_timeline_path = timeline_path.strip_prefix(&workdir)?; + let storage_path = storage.storage_root.join(relative_timeline_path).join(name); - let from_path = storage.pageserver_workdir.join(name); + let from_path = storage.working_directory.join(name); let (file, size) = create_file_for_upload(&from_path, &dummy_contents(name)).await?; storage.upload(file, size, &storage_path, metadata).await?; Ok(storage_path) diff --git a/pageserver/src/remote_storage/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs similarity index 74% rename from pageserver/src/remote_storage/s3_bucket.rs rename to libs/remote_storage/src/s3_bucket.rs index 73d828d150..01aaf7ca7e 100644 --- a/pageserver/src/remote_storage/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -1,7 +1,7 @@ //! AWS S3 storage wrapper around `rusoto` library. //! //! Respects `prefix_in_bucket` property from [`S3Config`], -//! allowing multiple pageservers to independently work with the same S3 bucket, if +//! allowing multiple api users to independently work with the same S3 bucket, if //! their bucket prefixes are both specified and different. use std::path::{Path, PathBuf}; @@ -19,16 +19,13 @@ use tokio::{io, sync::Semaphore}; use tokio_util::io::ReaderStream; use tracing::debug; -use crate::{ - config::S3Config, - remote_storage::{strip_path_prefix, RemoteStorage}, -}; +use crate::{strip_path_prefix, RemoteStorage, S3Config}; use super::StorageMetadata; -const S3_FILE_SEPARATOR: char = '/'; +const S3_PREFIX_SEPARATOR: char = '/'; -#[derive(Debug, Eq, PartialEq)] +#[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Hash)] pub struct S3ObjectKey(String); impl S3ObjectKey { @@ -36,11 +33,7 @@ impl S3ObjectKey { &self.0 } - fn download_destination( - &self, - pageserver_workdir: &Path, - prefix_to_strip: Option<&str>, - ) -> PathBuf { + fn download_destination(&self, workdir: &Path, prefix_to_strip: Option<&str>) -> PathBuf { let path_without_prefix = match prefix_to_strip { Some(prefix) => self.0.strip_prefix(prefix).unwrap_or_else(|| { panic!( @@ -51,9 +44,9 @@ impl S3ObjectKey { None => &self.0, }; - pageserver_workdir.join( + workdir.join( path_without_prefix - .split(S3_FILE_SEPARATOR) + .split(S3_PREFIX_SEPARATOR) .collect::(), ) } @@ -61,7 +54,7 @@ impl S3ObjectKey { /// AWS S3 storage. pub struct S3Bucket { - pageserver_workdir: &'static Path, + workdir: PathBuf, client: S3Client, bucket_name: String, prefix_in_bucket: Option, @@ -73,7 +66,7 @@ pub struct S3Bucket { impl S3Bucket { /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided. - pub fn new(aws_config: &S3Config, pageserver_workdir: &'static Path) -> anyhow::Result { + pub fn new(aws_config: &S3Config, workdir: PathBuf) -> anyhow::Result { debug!( "Creating s3 remote storage for S3 bucket {}", aws_config.bucket_name @@ -89,8 +82,11 @@ impl S3Bucket { .context("Failed to parse the s3 region from config")?, }; let request_dispatcher = HttpClient::new().context("Failed to create S3 http client")?; - let client = if aws_config.access_key_id.is_none() && aws_config.secret_access_key.is_none() - { + + let access_key_id = std::env::var("AWS_ACCESS_KEY_ID").ok(); + let secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").ok(); + + let client = if access_key_id.is_none() && secret_access_key.is_none() { debug!("Using IAM-based AWS access"); S3Client::new_with(request_dispatcher, InstanceMetadataProvider::new(), region) } else { @@ -98,8 +94,8 @@ impl S3Bucket { S3Client::new_with( request_dispatcher, StaticProvider::new_minimal( - aws_config.access_key_id.clone().unwrap_or_default(), - aws_config.secret_access_key.clone().unwrap_or_default(), + access_key_id.unwrap_or_default(), + secret_access_key.unwrap_or_default(), ), region, ) @@ -107,12 +103,12 @@ impl S3Bucket { let prefix_in_bucket = aws_config.prefix_in_bucket.as_deref().map(|prefix| { let mut prefix = prefix; - while prefix.starts_with(S3_FILE_SEPARATOR) { + while prefix.starts_with(S3_PREFIX_SEPARATOR) { prefix = &prefix[1..] } let mut prefix = prefix.to_string(); - while prefix.ends_with(S3_FILE_SEPARATOR) { + while prefix.ends_with(S3_PREFIX_SEPARATOR) { prefix.pop(); } prefix @@ -120,7 +116,7 @@ impl S3Bucket { Ok(Self { client, - pageserver_workdir, + workdir, bucket_name: aws_config.bucket_name.clone(), prefix_in_bucket, concurrency_limiter: Semaphore::new(aws_config.concurrency_limit.get()), @@ -130,24 +126,23 @@ impl S3Bucket { #[async_trait::async_trait] impl RemoteStorage for S3Bucket { - type StoragePath = S3ObjectKey; + type RemoteObjectId = S3ObjectKey; - fn storage_path(&self, local_path: &Path) -> anyhow::Result { - let relative_path = strip_path_prefix(self.pageserver_workdir, local_path)?; + fn remote_object_id(&self, local_path: &Path) -> anyhow::Result { + let relative_path = strip_path_prefix(&self.workdir, local_path)?; let mut key = self.prefix_in_bucket.clone().unwrap_or_default(); for segment in relative_path { - key.push(S3_FILE_SEPARATOR); + key.push(S3_PREFIX_SEPARATOR); key.push_str(&segment.to_string_lossy()); } Ok(S3ObjectKey(key)) } - fn local_path(&self, storage_path: &Self::StoragePath) -> anyhow::Result { - Ok(storage_path - .download_destination(self.pageserver_workdir, self.prefix_in_bucket.as_deref())) + fn local_path(&self, storage_path: &Self::RemoteObjectId) -> anyhow::Result { + Ok(storage_path.download_destination(&self.workdir, self.prefix_in_bucket.as_deref())) } - async fn list(&self) -> anyhow::Result> { + async fn list(&self) -> anyhow::Result> { let mut document_keys = Vec::new(); let mut continuation_token = None; @@ -187,7 +182,7 @@ impl RemoteStorage for S3Bucket { &self, from: impl io::AsyncRead + Unpin + Send + Sync + 'static, from_size_bytes: usize, - to: &Self::StoragePath, + to: &Self::RemoteObjectId, metadata: Option, ) -> anyhow::Result<()> { let _guard = self @@ -212,7 +207,7 @@ impl RemoteStorage for S3Bucket { async fn download( &self, - from: &Self::StoragePath, + from: &Self::RemoteObjectId, to: &mut (impl io::AsyncWrite + Unpin + Send + Sync), ) -> anyhow::Result> { let _guard = self @@ -237,9 +232,9 @@ impl RemoteStorage for S3Bucket { Ok(object_output.metadata.map(StorageMetadata)) } - async fn download_range( + async fn download_byte_range( &self, - from: &Self::StoragePath, + from: &Self::RemoteObjectId, start_inclusive: u64, end_exclusive: Option, to: &mut (impl io::AsyncWrite + Unpin + Send + Sync), @@ -274,7 +269,7 @@ impl RemoteStorage for S3Bucket { Ok(object_output.metadata.map(StorageMetadata)) } - async fn delete(&self, path: &Self::StoragePath) -> anyhow::Result<()> { + async fn delete(&self, path: &Self::RemoteObjectId) -> anyhow::Result<()> { let _guard = self .concurrency_limiter .acquire() @@ -293,34 +288,30 @@ impl RemoteStorage for S3Bucket { #[cfg(test)] mod tests { - use crate::{ - layered_repository::metadata::METADATA_FILE_NAME, - repository::repo_harness::{RepoHarness, TIMELINE_ID}, - }; + use tempfile::tempdir; use super::*; #[test] fn download_destination() -> anyhow::Result<()> { - let repo_harness = RepoHarness::create("download_destination")?; - - let local_path = repo_harness.timeline_path(&TIMELINE_ID).join("test_name"); - let relative_path = local_path.strip_prefix(&repo_harness.conf.workdir)?; + let workdir = tempdir()?.path().to_owned(); + let local_path = workdir.join("one").join("two").join("test_name"); + let relative_path = local_path.strip_prefix(&workdir)?; let key = S3ObjectKey(format!( "{}{}", - S3_FILE_SEPARATOR, + S3_PREFIX_SEPARATOR, relative_path .iter() .map(|segment| segment.to_str().unwrap()) .collect::>() - .join(&S3_FILE_SEPARATOR.to_string()), + .join(&S3_PREFIX_SEPARATOR.to_string()), )); assert_eq!( local_path, - key.download_destination(&repo_harness.conf.workdir, None), - "Download destination should consist of s3 path joined with the pageserver workdir prefix" + key.download_destination(&workdir, None), + "Download destination should consist of s3 path joined with the workdir prefix" ); Ok(()) @@ -328,24 +319,21 @@ mod tests { #[test] fn storage_path_positive() -> anyhow::Result<()> { - let repo_harness = RepoHarness::create("storage_path_positive")?; + let workdir = tempdir()?.path().to_owned(); let segment_1 = "matching"; let segment_2 = "file"; - let local_path = &repo_harness.conf.workdir.join(segment_1).join(segment_2); + let local_path = &workdir.join(segment_1).join(segment_2); - let storage = dummy_storage(&repo_harness.conf.workdir); + let storage = dummy_storage(workdir); let expected_key = S3ObjectKey(format!( - "{}{SEPARATOR}{}{SEPARATOR}{}", + "{}{S3_PREFIX_SEPARATOR}{segment_1}{S3_PREFIX_SEPARATOR}{segment_2}", storage.prefix_in_bucket.as_deref().unwrap_or_default(), - segment_1, - segment_2, - SEPARATOR = S3_FILE_SEPARATOR, )); let actual_key = storage - .storage_path(local_path) + .remote_object_id(local_path) .expect("Matching path should map to S3 path normally"); assert_eq!( expected_key, @@ -360,7 +348,7 @@ mod tests { fn storage_path_negatives() -> anyhow::Result<()> { #[track_caller] fn storage_path_error(storage: &S3Bucket, mismatching_path: &Path) -> String { - match storage.storage_path(mismatching_path) { + match storage.remote_object_id(mismatching_path) { Ok(wrong_key) => panic!( "Expected path '{}' to error, but got S3 key: {:?}", mismatching_path.display(), @@ -370,10 +358,10 @@ mod tests { } } - let repo_harness = RepoHarness::create("storage_path_negatives")?; - let storage = dummy_storage(&repo_harness.conf.workdir); + let workdir = tempdir()?.path().to_owned(); + let storage = dummy_storage(workdir.clone()); - let error_message = storage_path_error(&storage, &repo_harness.conf.workdir); + let error_message = storage_path_error(&storage, &workdir); assert!( error_message.contains("Prefix and the path are equal"), "Message '{}' does not contain the required string", @@ -387,7 +375,7 @@ mod tests { "Error should mention wrong path" ); assert!( - error_message.contains(repo_harness.conf.workdir.to_str().unwrap()), + error_message.contains(workdir.to_str().unwrap()), "Error should mention server workdir" ); assert!( @@ -401,20 +389,17 @@ mod tests { #[test] fn local_path_positive() -> anyhow::Result<()> { - let repo_harness = RepoHarness::create("local_path_positive")?; - let storage = dummy_storage(&repo_harness.conf.workdir); - let timeline_dir = repo_harness.timeline_path(&TIMELINE_ID); - let relative_timeline_path = timeline_dir.strip_prefix(&repo_harness.conf.workdir)?; + let workdir = tempdir()?.path().to_owned(); + let storage = dummy_storage(workdir.clone()); + let timeline_dir = workdir.join("timelines").join("test_timeline"); + let relative_timeline_path = timeline_dir.strip_prefix(&workdir)?; let s3_key = create_s3_key( &relative_timeline_path.join("not a metadata"), storage.prefix_in_bucket.as_deref(), ); assert_eq!( - s3_key.download_destination( - &repo_harness.conf.workdir, - storage.prefix_in_bucket.as_deref() - ), + s3_key.download_destination(&workdir, storage.prefix_in_bucket.as_deref()), storage .local_path(&s3_key) .expect("For a valid input, valid S3 info should be parsed"), @@ -422,14 +407,11 @@ mod tests { ); let s3_key = create_s3_key( - &relative_timeline_path.join(METADATA_FILE_NAME), + &relative_timeline_path.join("metadata"), storage.prefix_in_bucket.as_deref(), ); assert_eq!( - s3_key.download_destination( - &repo_harness.conf.workdir, - storage.prefix_in_bucket.as_deref() - ), + s3_key.download_destination(&workdir, storage.prefix_in_bucket.as_deref()), storage .local_path(&s3_key) .expect("For a valid input, valid S3 info should be parsed"), @@ -441,12 +423,15 @@ mod tests { #[test] fn download_destination_matches_original_path() -> anyhow::Result<()> { - let repo_harness = RepoHarness::create("download_destination_matches_original_path")?; - let original_path = repo_harness.timeline_path(&TIMELINE_ID).join("some name"); + let workdir = tempdir()?.path().to_owned(); + let original_path = workdir + .join("timelines") + .join("some_timeline") + .join("some name"); - let dummy_storage = dummy_storage(&repo_harness.conf.workdir); + let dummy_storage = dummy_storage(workdir); - let key = dummy_storage.storage_path(&original_path)?; + let key = dummy_storage.remote_object_id(&original_path)?; let download_destination = dummy_storage.local_path(&key)?; assert_eq!( @@ -457,9 +442,9 @@ mod tests { Ok(()) } - fn dummy_storage(pageserver_workdir: &'static Path) -> S3Bucket { + fn dummy_storage(workdir: PathBuf) -> S3Bucket { S3Bucket { - pageserver_workdir, + workdir, client: S3Client::new("us-east-1".parse().unwrap()), bucket_name: "dummy-bucket".to_string(), prefix_in_bucket: Some("dummy_prefix/".to_string()), @@ -471,7 +456,7 @@ mod tests { S3ObjectKey(relative_file_path.iter().fold( prefix.unwrap_or_default().to_string(), |mut path_string, segment| { - path_string.push(S3_FILE_SEPARATOR); + path_string.push(S3_PREFIX_SEPARATOR); path_string.push_str(segment.to_str().unwrap()); path_string }, diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 23c16dd5be..d4cceafc61 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -5,7 +5,7 @@ edition = "2021" [features] # It is simpler infra-wise to have failpoints enabled by default -# It shouldnt affect perf in any way because failpoints +# It shouldn't affect perf in any way because failpoints # are not placed in hot code paths default = ["failpoints"] profiling = ["pprof"] @@ -25,7 +25,6 @@ lazy_static = "1.4.0" clap = "3.0" daemonize = "0.4.1" tokio = { version = "1.17", features = ["process", "sync", "macros", "fs", "rt", "io-util", "time"] } -tokio-util = { version = "0.7", features = ["io"] } postgres-types = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } @@ -54,13 +53,10 @@ once_cell = "1.8.0" crossbeam-utils = "0.8.5" fail = "0.5.0" -rusoto_core = "0.47" -rusoto_s3 = "0.47" -async-trait = "0.1" - postgres_ffi = { path = "../libs/postgres_ffi" } metrics = { path = "../libs/metrics" } utils = { path = "../libs/utils" } +remote_storage = { path = "../libs/remote_storage" } workspace_hack = { version = "0.1", path = "../workspace_hack" } [dev-dependencies] diff --git a/pageserver/README.md b/pageserver/README.md index 1fd627785c..cf841d1e46 100644 --- a/pageserver/README.md +++ b/pageserver/README.md @@ -135,7 +135,7 @@ The backup service is disabled by default and can be enabled to interact with a CLI examples: * Local FS: `${PAGESERVER_BIN} -c "remote_storage={local_path='/some/local/path/'}"` -* AWS S3 : `${PAGESERVER_BIN} -c "remote_storage={bucket_name='some-sample-bucket',bucket_region='eu-north-1', prefix_in_bucket='/test_prefix/',access_key_id='SOMEKEYAAAAASADSAH*#',secret_access_key='SOMEsEcReTsd292v'}"` +* AWS S3 : `env AWS_ACCESS_KEY_ID='SOMEKEYAAAAASADSAH*#' AWS_SECRET_ACCESS_KEY='SOMEsEcReTsd292v' ${PAGESERVER_BIN} -c "remote_storage={bucket_name='some-sample-bucket',bucket_region='eu-north-1', prefix_in_bucket='/test_prefix/'}"` For Amazon AWS S3, a key id and secret access key could be located in `~/.aws/credentials` if awscli was ever configured to work with the desired bucket, on the AWS Settings page for a certain user. Also note, that the bucket names does not contain any protocols when used on AWS. For local S3 installations, refer to the their documentation for name format and credentials. @@ -155,11 +155,9 @@ or bucket_name = 'some-sample-bucket' bucket_region = 'eu-north-1' prefix_in_bucket = '/test_prefix/' -access_key_id = 'SOMEKEYAAAAASADSAH*#' -secret_access_key = 'SOMEsEcReTsd292v' ``` -Also, `AWS_SECRET_ACCESS_KEY` and `AWS_ACCESS_KEY_ID` variables can be used to specify the credentials instead of any of the ways above. +`AWS_SECRET_ACCESS_KEY` and `AWS_ACCESS_KEY_ID` env variables can be used to specify the S3 credentials if needed. TODO: Sharding -------------------- diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 14ca976448..5257732c5c 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -5,6 +5,7 @@ //! See also `settings.md` for better description on every parameter. use anyhow::{anyhow, bail, ensure, Context, Result}; +use remote_storage::{RemoteStorageConfig, RemoteStorageKind, S3Config}; use std::env; use std::num::{NonZeroU32, NonZeroUsize}; use std::path::{Path, PathBuf}; @@ -33,18 +34,6 @@ pub mod defaults { pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s"; pub const DEFAULT_SUPERUSER: &str = "zenith_admin"; - /// How many different timelines can be processed simultaneously when synchronizing layers with the remote storage. - /// During regular work, pageserver produces one layer file per timeline checkpoint, with bursts of concurrency - /// during start (where local and remote timelines are compared and initial sync tasks are scheduled) and timeline attach. - /// Both cases may trigger timeline download, that might download a lot of layers. This concurrency is limited by the clients internally, if needed. - pub const DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_TIMELINES_SYNC: usize = 50; - pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10; - /// Currently, sync happens with AWS S3, that has two limits on requests per second: - /// ~200 RPS for IAM services - /// https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html - /// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests - /// https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/ - pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100; pub const DEFAULT_PAGE_CACHE_SIZE: usize = 8192; pub const DEFAULT_MAX_FILE_DESCRIPTORS: usize = 100; @@ -315,67 +304,6 @@ impl PageServerConfigBuilder { } } -/// External backup storage configuration, enough for creating a client for that storage. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct RemoteStorageConfig { - /// Max allowed number of concurrent sync operations between pageserver and the remote storage. - pub max_concurrent_timelines_sync: NonZeroUsize, - /// Max allowed errors before the sync task is considered failed and evicted. - pub max_sync_errors: NonZeroU32, - /// The storage connection configuration. - pub storage: RemoteStorageKind, -} - -/// A kind of a remote storage to connect to, with its connection configuration. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum RemoteStorageKind { - /// Storage based on local file system. - /// Specify a root folder to place all stored files into. - LocalFs(PathBuf), - /// AWS S3 based storage, storing all files in the S3 bucket - /// specified by the config - AwsS3(S3Config), -} - -/// AWS S3 bucket coordinates and access credentials to manage the bucket contents (read and write). -#[derive(Clone, PartialEq, Eq)] -pub struct S3Config { - /// Name of the bucket to connect to. - pub bucket_name: String, - /// The region where the bucket is located at. - pub bucket_region: String, - /// A "subfolder" in the bucket, to use the same bucket separately by multiple pageservers at once. - pub prefix_in_bucket: Option, - /// "Login" to use when connecting to bucket. - /// Can be empty for cases like AWS k8s IAM - /// where we can allow certain pods to connect - /// to the bucket directly without any credentials. - pub access_key_id: Option, - /// "Password" to use when connecting to bucket. - pub secret_access_key: Option, - /// A base URL to send S3 requests to. - /// By default, the endpoint is derived from a region name, assuming it's - /// an AWS S3 region name, erroring on wrong region name. - /// Endpoint provides a way to support other S3 flavors and their regions. - /// - /// Example: `http://127.0.0.1:5000` - pub endpoint: Option, - /// AWS S3 has various limits on its API calls, we need not to exceed those. - /// See [`defaults::DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT`] for more details. - pub concurrency_limit: NonZeroUsize, -} - -impl std::fmt::Debug for S3Config { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("S3Config") - .field("bucket_name", &self.bucket_name) - .field("bucket_region", &self.bucket_region) - .field("prefix_in_bucket", &self.prefix_in_bucket) - .field("concurrency_limit", &self.concurrency_limit) - .finish() - } -} - impl PageServerConf { // // Repository paths, relative to workdir. @@ -523,21 +451,21 @@ impl PageServerConf { let bucket_name = toml.get("bucket_name"); let bucket_region = toml.get("bucket_region"); - let max_concurrent_timelines_sync = NonZeroUsize::new( - parse_optional_integer("max_concurrent_timelines_sync", toml)? - .unwrap_or(defaults::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_TIMELINES_SYNC), + let max_concurrent_syncs = NonZeroUsize::new( + parse_optional_integer("max_concurrent_syncs", toml)? + .unwrap_or(remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS), ) - .context("Failed to parse 'max_concurrent_timelines_sync' as a positive integer")?; + .context("Failed to parse 'max_concurrent_syncs' as a positive integer")?; let max_sync_errors = NonZeroU32::new( parse_optional_integer("max_sync_errors", toml)? - .unwrap_or(defaults::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS), + .unwrap_or(remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS), ) .context("Failed to parse 'max_sync_errors' as a positive integer")?; let concurrency_limit = NonZeroUsize::new( parse_optional_integer("concurrency_limit", toml)? - .unwrap_or(defaults::DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT), + .unwrap_or(remote_storage::DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT), ) .context("Failed to parse 'concurrency_limit' as a positive integer")?; @@ -552,16 +480,6 @@ impl PageServerConf { (None, Some(bucket_name), Some(bucket_region)) => RemoteStorageKind::AwsS3(S3Config { bucket_name: parse_toml_string("bucket_name", bucket_name)?, bucket_region: parse_toml_string("bucket_region", bucket_region)?, - access_key_id: toml - .get("access_key_id") - .map(|access_key_id| parse_toml_string("access_key_id", access_key_id)) - .transpose()?, - secret_access_key: toml - .get("secret_access_key") - .map(|secret_access_key| { - parse_toml_string("secret_access_key", secret_access_key) - }) - .transpose()?, prefix_in_bucket: toml .get("prefix_in_bucket") .map(|prefix_in_bucket| parse_toml_string("prefix_in_bucket", prefix_in_bucket)) @@ -579,7 +497,7 @@ impl PageServerConf { }; Ok(RemoteStorageConfig { - max_concurrent_timelines_sync, + max_concurrent_syncs, max_sync_errors, storage, }) @@ -807,11 +725,11 @@ pg_distrib_dir='{}' assert_eq!( parsed_remote_storage_config, RemoteStorageConfig { - max_concurrent_timelines_sync: NonZeroUsize::new( - defaults::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_TIMELINES_SYNC + max_concurrent_syncs: NonZeroUsize::new( + remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS ) .unwrap(), - max_sync_errors: NonZeroU32::new(defaults::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS) + max_sync_errors: NonZeroU32::new(remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS) .unwrap(), storage: RemoteStorageKind::LocalFs(local_storage_path.clone()), }, @@ -829,29 +747,25 @@ pg_distrib_dir='{}' let bucket_name = "some-sample-bucket".to_string(); let bucket_region = "eu-north-1".to_string(); let prefix_in_bucket = "test_prefix".to_string(); - let access_key_id = "SOMEKEYAAAAASADSAH*#".to_string(); - let secret_access_key = "SOMEsEcReTsd292v".to_string(); let endpoint = "http://localhost:5000".to_string(); - let max_concurrent_timelines_sync = NonZeroUsize::new(111).unwrap(); + let max_concurrent_syncs = NonZeroUsize::new(111).unwrap(); let max_sync_errors = NonZeroU32::new(222).unwrap(); let s3_concurrency_limit = NonZeroUsize::new(333).unwrap(); let identical_toml_declarations = &[ format!( r#"[remote_storage] -max_concurrent_timelines_sync = {max_concurrent_timelines_sync} +max_concurrent_syncs = {max_concurrent_syncs} max_sync_errors = {max_sync_errors} bucket_name = '{bucket_name}' bucket_region = '{bucket_region}' prefix_in_bucket = '{prefix_in_bucket}' -access_key_id = '{access_key_id}' -secret_access_key = '{secret_access_key}' endpoint = '{endpoint}' concurrency_limit = {s3_concurrency_limit}"# ), format!( - "remote_storage={{max_concurrent_timelines_sync={max_concurrent_timelines_sync}, max_sync_errors={max_sync_errors}, bucket_name='{bucket_name}',\ - bucket_region='{bucket_region}', prefix_in_bucket='{prefix_in_bucket}', access_key_id='{access_key_id}', secret_access_key='{secret_access_key}', endpoint='{endpoint}', concurrency_limit={s3_concurrency_limit}}}", + "remote_storage={{max_concurrent_syncs={max_concurrent_syncs}, max_sync_errors={max_sync_errors}, bucket_name='{bucket_name}',\ + bucket_region='{bucket_region}', prefix_in_bucket='{prefix_in_bucket}', endpoint='{endpoint}', concurrency_limit={s3_concurrency_limit}}}", ), ]; @@ -874,13 +788,11 @@ pg_distrib_dir='{}' assert_eq!( parsed_remote_storage_config, RemoteStorageConfig { - max_concurrent_timelines_sync, + max_concurrent_syncs, max_sync_errors, storage: RemoteStorageKind::AwsS3(S3Config { bucket_name: bucket_name.clone(), bucket_region: bucket_region.clone(), - access_key_id: Some(access_key_id.clone()), - secret_access_key: Some(secret_access_key.clone()), prefix_in_bucket: Some(prefix_in_bucket.clone()), endpoint: Some(endpoint.clone()), concurrency_limit: s3_concurrency_limit, diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index f12e4c4051..8940efbda0 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -3,17 +3,16 @@ use std::sync::Arc; use anyhow::{Context, Result}; use hyper::StatusCode; use hyper::{Body, Request, Response, Uri}; +use remote_storage::GenericRemoteStorage; use tracing::*; use super::models::{ StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TimelineCreateRequest, }; -use crate::config::RemoteStorageKind; -use crate::remote_storage::{ - download_index_part, schedule_layer_download, LocalFs, RemoteIndex, RemoteTimeline, S3Bucket, -}; use crate::repository::Repository; +use crate::storage_sync; +use crate::storage_sync::index::{RemoteIndex, RemoteTimeline}; use crate::tenant_config::TenantConfOpt; use crate::timelines::{LocalTimelineInfo, RemoteTimelineInfo, TimelineInfo}; use crate::{config::PageServerConf, tenant_mgr, timelines}; @@ -37,11 +36,6 @@ struct State { remote_storage: Option, } -enum GenericRemoteStorage { - Local(LocalFs), - S3(S3Bucket), -} - impl State { fn new( conf: &'static PageServerConf, @@ -57,14 +51,7 @@ impl State { let remote_storage = conf .remote_storage_config .as_ref() - .map(|storage_config| match &storage_config.storage { - RemoteStorageKind::LocalFs(root) => { - LocalFs::new(root.clone(), &conf.workdir).map(GenericRemoteStorage::Local) - } - RemoteStorageKind::AwsS3(s3_config) => { - S3Bucket::new(s3_config, &conf.workdir).map(GenericRemoteStorage::S3) - } - }) + .map(|storage_config| GenericRemoteStorage::new(conf.workdir.clone(), storage_config)) .transpose() .context("Failed to init generic remote storage")?; @@ -273,7 +260,7 @@ async fn timeline_attach_handler(request: Request) -> Result) -> Result index_accessor.add_timeline_entry(sync_id, new_timeline), } - schedule_layer_download(tenant_id, timeline_id); + storage_sync::schedule_layer_download(tenant_id, timeline_id); json_response(StatusCode::ACCEPTED, ()) } @@ -319,10 +306,10 @@ async fn try_download_shard_data( ) -> anyhow::Result> { let shard = match state.remote_storage.as_ref() { Some(GenericRemoteStorage::Local(local_storage)) => { - download_index_part(state.conf, local_storage, sync_id).await + storage_sync::download_index_part(state.conf, local_storage, sync_id).await } Some(GenericRemoteStorage::S3(s3_storage)) => { - download_index_part(state.conf, s3_storage, sync_id).await + storage_sync::download_index_part(state.conf, s3_storage, sync_id).await } None => return Ok(None), } diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 77c01a7c66..da2699b15d 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -34,10 +34,9 @@ use std::time::{Duration, Instant, SystemTime}; use self::metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME}; use crate::config::PageServerConf; use crate::keyspace::KeySpace; +use crate::storage_sync::index::RemoteIndex; use crate::tenant_config::{TenantConf, TenantConfOpt}; -use crate::page_cache; -use crate::remote_storage::{self, RemoteIndex}; use crate::repository::{ GcResult, Repository, RepositoryTimeline, Timeline, TimelineSyncStatusUpdate, TimelineWriter, }; @@ -48,6 +47,7 @@ use crate::virtual_file::VirtualFile; use crate::walreceiver::IS_WAL_RECEIVER; use crate::walredo::WalRedoManager; use crate::CheckpointConfig; +use crate::{page_cache, storage_sync}; use metrics::{ register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge_vec, @@ -1785,7 +1785,7 @@ impl LayeredTimeline { PERSISTENT_BYTES_WRITTEN.inc_by(new_delta_path.metadata()?.len()); if self.upload_layers.load(atomic::Ordering::Relaxed) { - remote_storage::schedule_layer_upload( + storage_sync::schedule_layer_upload( self.tenantid, self.timelineid, HashSet::from([new_delta_path]), @@ -1857,7 +1857,7 @@ impl LayeredTimeline { } } if self.upload_layers.load(atomic::Ordering::Relaxed) { - remote_storage::schedule_layer_upload( + storage_sync::schedule_layer_upload( self.tenantid, self.timelineid, layer_paths_to_upload, @@ -2056,13 +2056,13 @@ impl LayeredTimeline { drop(layers); if self.upload_layers.load(atomic::Ordering::Relaxed) { - remote_storage::schedule_layer_upload( + storage_sync::schedule_layer_upload( self.tenantid, self.timelineid, new_layer_paths, None, ); - remote_storage::schedule_layer_delete( + storage_sync::schedule_layer_delete( self.tenantid, self.timelineid, layer_paths_do_delete, @@ -2253,7 +2253,7 @@ impl LayeredTimeline { } if self.upload_layers.load(atomic::Ordering::Relaxed) { - remote_storage::schedule_layer_delete( + storage_sync::schedule_layer_delete( self.tenantid, self.timelineid, layer_paths_to_delete, diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 0b1c53172c..83985069ec 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -9,8 +9,8 @@ pub mod page_service; pub mod pgdatadir_mapping; pub mod profiling; pub mod reltag; -pub mod remote_storage; pub mod repository; +pub mod storage_sync; pub mod tenant_config; pub mod tenant_mgr; pub mod tenant_threads; diff --git a/pageserver/src/remote_storage.rs b/pageserver/src/remote_storage.rs deleted file mode 100644 index 4db0f6667d..0000000000 --- a/pageserver/src/remote_storage.rs +++ /dev/null @@ -1,412 +0,0 @@ -//! A set of generic storage abstractions for the page server to use when backing up and restoring its state from the external storage. -//! This particular module serves as a public API border between pageserver and the internal storage machinery. -//! No other modules from this tree are supposed to be used directly by the external code. -//! -//! There are a few components the storage machinery consists of: -//! * [`RemoteStorage`] trait a CRUD-like generic abstraction to use for adapting external storages with a few implementations: -//! * [`local_fs`] allows to use local file system as an external storage -//! * [`s3_bucket`] uses AWS S3 bucket as an external storage -//! -//! * synchronization logic at [`storage_sync`] module that keeps pageserver state (both runtime one and the workdir files) and storage state in sync. -//! Synchronization internals are split into submodules -//! * [`storage_sync::index`] to keep track of remote tenant files, the metadata and their mappings to local files -//! * [`storage_sync::upload`] and [`storage_sync::download`] to manage archive creation and upload; download and extraction, respectively -//! -//! * public API via to interact with the external world: -//! * [`start_local_timeline_sync`] to launch a background async loop to handle the synchronization -//! * [`schedule_layer_upload`], [`schedule_layer_download`] and [`schedule_layer_delete`] to enqueue a new upload and download tasks, -//! to be processed by the async loop -//! -//! Here's a schematic overview of all interactions backup and the rest of the pageserver perform: -//! -//! +------------------------+ +--------->-------+ -//! | | - - - (init async loop) - - - -> | | -//! | | | | -//! | | -------------------------------> | async | -//! | pageserver | (enqueue timeline sync task) | upload/download | -//! | | | loop | -//! | | <------------------------------- | | -//! | | (apply new timeline sync states) | | -//! +------------------------+ +---------<-------+ -//! | -//! | -//! CRUD layer file operations | -//! (upload/download/delete/list, etc.) | -//! V -//! +------------------------+ -//! | | -//! | [`RemoteStorage`] impl | -//! | | -//! | pageserver assumes it | -//! | owns exclusive write | -//! | access to this storage | -//! +------------------------+ -//! -//! First, during startup, the pageserver inits the storage sync thread with the async loop, or leaves the loop uninitialised, if configured so. -//! The loop inits the storage connection and checks the remote files stored. -//! This is done once at startup only, relying on the fact that pageserver uses the storage alone (ergo, nobody else uploads the files to the storage but this server). -//! Based on the remote storage data, the sync logic immediately schedules sync tasks for local timelines and reports about remote only timelines to pageserver, so it can -//! query their downloads later if they are accessed. -//! -//! Some time later, during pageserver checkpoints, in-memory data is flushed onto disk along with its metadata. -//! If the storage sync loop was successfully started before, pageserver schedules the new checkpoint file uploads after every checkpoint. -//! The checkpoint uploads are disabled, if no remote storage configuration is provided (no sync loop is started this way either). -//! See [`crate::layered_repository`] for the upload calls and the adjacent logic. -//! -//! Synchronization logic is able to communicate back with updated timeline sync states, [`crate::repository::TimelineSyncStatusUpdate`], -//! submitted via [`crate::tenant_mgr::apply_timeline_sync_status_updates`] function. Tenant manager applies corresponding timeline updates in pageserver's in-memory state. -//! Such submissions happen in two cases: -//! * once after the sync loop startup, to signal pageserver which timelines will be synchronized in the near future -//! * after every loop step, in case a timeline needs to be reloaded or evicted from pageserver's memory -//! -//! When the pageserver terminates, the sync loop finishes a current sync task (if any) and exits. -//! -//! The storage logic considers `image` as a set of local files (layers), fully representing a certain timeline at given moment (identified with `disk_consistent_lsn` from the corresponding `metadata` file). -//! Timeline can change its state, by adding more files on disk and advancing its `disk_consistent_lsn`: this happens after pageserver checkpointing and is followed -//! by the storage upload, if enabled. -//! Yet timeline cannot alter already existing files, and cannot remove those too: only a GC process is capable of removing unused files. -//! This way, remote storage synchronization relies on the fact that every checkpoint is incremental and local files are "immutable": -//! * when a certain checkpoint gets uploaded, the sync loop remembers the fact, preventing further reuploads of the same state -//! * no files are deleted from either local or remote storage, only the missing ones locally/remotely get downloaded/uploaded, local metadata file will be overwritten -//! when the newer image is downloaded -//! -//! Pageserver maintains similar to the local file structure remotely: all layer files are uploaded with the same names under the same directory structure. -//! Yet instead of keeping the `metadata` file remotely, we wrap it with more data in [`IndexPart`], containing the list of remote files. -//! This file gets read to populate the cache, if the remote timeline data is missing from it and gets updated after every successful download. -//! This way, we optimize S3 storage access by not running the `S3 list` command that could be expencive and slow: knowing both [`ZTenantId`] and [`ZTimelineId`], -//! we can always reconstruct the path to the timeline, use this to get the same path on the remote storage and retrive its part contents, if needed, same as any layer files. -//! -//! By default, pageserver reads the remote storage index data only for timelines located locally, to synchronize those, if needed. -//! Bulk index data download happens only initially, on pageserer startup. The rest of the remote storage stays unknown to pageserver and loaded on demand only, -//! when a new timeline is scheduled for the download. -//! -//! NOTES: -//! * pageserver assumes it has exclusive write access to the remote storage. If supported, the way multiple pageservers can be separated in the same storage -//! (i.e. using different directories in the local filesystem external storage), but totally up to the storage implementation and not covered with the trait API. -//! -//! * the sync tasks may not processed immediately after the submission: if they error and get re-enqueued, their execution might be backed off to ensure error cap is not exceeded too fast. -//! The sync queue processing also happens in batches, so the sync tasks can wait in the queue for some time. - -mod local_fs; -mod s3_bucket; -mod storage_sync; - -use std::{ - collections::{HashMap, HashSet}, - ffi, fs, - path::{Path, PathBuf}, -}; - -use anyhow::{bail, Context}; -use tokio::io; -use tracing::{debug, error, info}; - -use self::storage_sync::TEMP_DOWNLOAD_EXTENSION; -pub use self::{ - local_fs::LocalFs, - s3_bucket::S3Bucket, - storage_sync::{ - download_index_part, - index::{IndexPart, RemoteIndex, RemoteTimeline}, - schedule_layer_delete, schedule_layer_download, schedule_layer_upload, - }, -}; -use crate::{ - config::{PageServerConf, RemoteStorageKind}, - layered_repository::{ - ephemeral_file::is_ephemeral_file, - metadata::{TimelineMetadata, METADATA_FILE_NAME}, - }, -}; -use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}; - -/// A timeline status to share with pageserver's sync counterpart, -/// after comparing local and remote timeline state. -#[derive(Clone, Copy, Debug)] -pub enum LocalTimelineInitStatus { - /// The timeline has every remote layer present locally. - /// There could be some layers requiring uploading, - /// but this does not block the timeline from any user interaction. - LocallyComplete, - /// A timeline has some files remotely, that are not present locally and need downloading. - /// Downloading might update timeline's metadata locally and current pageserver logic deals with local layers only, - /// so the data needs to be downloaded first before the timeline can be used. - NeedsSync, -} - -type LocalTimelineInitStatuses = HashMap>; - -/// A structure to combine all synchronization data to share with pageserver after a successful sync loop initialization. -/// Successful initialization includes a case when sync loop is not started, in which case the startup data is returned still, -/// to simplify the received code. -pub struct SyncStartupData { - pub remote_index: RemoteIndex, - pub local_timeline_init_statuses: LocalTimelineInitStatuses, -} - -/// Based on the config, initiates the remote storage connection and starts a separate thread -/// that ensures that pageserver and the remote storage are in sync with each other. -/// If no external configuration connection given, no thread or storage initialization is done. -/// Along with that, scans tenant files local and remote (if the sync gets enabled) to check the initial timeline states. -pub fn start_local_timeline_sync( - config: &'static PageServerConf, -) -> anyhow::Result { - let local_timeline_files = local_tenant_timeline_files(config) - .context("Failed to collect local tenant timeline files")?; - - match &config.remote_storage_config { - Some(storage_config) => match &storage_config.storage { - RemoteStorageKind::LocalFs(root) => { - info!("Using fs root '{}' as a remote storage", root.display()); - storage_sync::spawn_storage_sync_thread( - config, - local_timeline_files, - LocalFs::new(root.clone(), &config.workdir)?, - storage_config.max_concurrent_timelines_sync, - storage_config.max_sync_errors, - ) - }, - RemoteStorageKind::AwsS3(s3_config) => { - info!("Using s3 bucket '{}' in region '{}' as a remote storage, prefix in bucket: '{:?}', bucket endpoint: '{:?}'", - s3_config.bucket_name, s3_config.bucket_region, s3_config.prefix_in_bucket, s3_config.endpoint); - storage_sync::spawn_storage_sync_thread( - config, - local_timeline_files, - S3Bucket::new(s3_config, &config.workdir)?, - storage_config.max_concurrent_timelines_sync, - storage_config.max_sync_errors, - ) - }, - } - .context("Failed to spawn the storage sync thread"), - None => { - info!("No remote storage configured, skipping storage sync, considering all local timelines with correct metadata files enabled"); - let mut local_timeline_init_statuses = LocalTimelineInitStatuses::new(); - for (ZTenantTimelineId { tenant_id, timeline_id }, _) in - local_timeline_files - { - local_timeline_init_statuses - .entry(tenant_id) - .or_default() - .insert(timeline_id, LocalTimelineInitStatus::LocallyComplete); - } - Ok(SyncStartupData { - local_timeline_init_statuses, - remote_index: RemoteIndex::empty(), - }) - } - } -} - -fn local_tenant_timeline_files( - config: &'static PageServerConf, -) -> anyhow::Result)>> { - let mut local_tenant_timeline_files = HashMap::new(); - let tenants_dir = config.tenants_path(); - for tenants_dir_entry in fs::read_dir(&tenants_dir) - .with_context(|| format!("Failed to list tenants dir {}", tenants_dir.display()))? - { - match &tenants_dir_entry { - Ok(tenants_dir_entry) => { - match collect_timelines_for_tenant(config, &tenants_dir_entry.path()) { - Ok(collected_files) => { - local_tenant_timeline_files.extend(collected_files.into_iter()) - } - Err(e) => error!( - "Failed to collect tenant files from dir '{}' for entry {:?}, reason: {:#}", - tenants_dir.display(), - tenants_dir_entry, - e - ), - } - } - Err(e) => error!( - "Failed to list tenants dir entry {:?} in directory {}, reason: {:?}", - tenants_dir_entry, - tenants_dir.display(), - e - ), - } - } - - Ok(local_tenant_timeline_files) -} - -fn collect_timelines_for_tenant( - config: &'static PageServerConf, - tenant_path: &Path, -) -> anyhow::Result)>> { - let mut timelines = HashMap::new(); - let tenant_id = tenant_path - .file_name() - .and_then(ffi::OsStr::to_str) - .unwrap_or_default() - .parse::() - .context("Could not parse tenant id out of the tenant dir name")?; - let timelines_dir = config.timelines_path(&tenant_id); - - for timelines_dir_entry in fs::read_dir(&timelines_dir).with_context(|| { - format!( - "Failed to list timelines dir entry for tenant {}", - tenant_id - ) - })? { - match timelines_dir_entry { - Ok(timelines_dir_entry) => { - let timeline_path = timelines_dir_entry.path(); - match collect_timeline_files(&timeline_path) { - Ok((timeline_id, metadata, timeline_files)) => { - timelines.insert( - ZTenantTimelineId { - tenant_id, - timeline_id, - }, - (metadata, timeline_files), - ); - } - Err(e) => error!( - "Failed to process timeline dir contents at '{}', reason: {:?}", - timeline_path.display(), - e - ), - } - } - Err(e) => error!( - "Failed to list timelines for entry tenant {}, reason: {:?}", - tenant_id, e - ), - } - } - - Ok(timelines) -} - -// discover timeline files and extract timeline metadata -// NOTE: ephemeral files are excluded from the list -fn collect_timeline_files( - timeline_dir: &Path, -) -> anyhow::Result<(ZTimelineId, TimelineMetadata, HashSet)> { - let mut timeline_files = HashSet::new(); - let mut timeline_metadata_path = None; - - let timeline_id = timeline_dir - .file_name() - .and_then(ffi::OsStr::to_str) - .unwrap_or_default() - .parse::() - .context("Could not parse timeline id out of the timeline dir name")?; - let timeline_dir_entries = - fs::read_dir(&timeline_dir).context("Failed to list timeline dir contents")?; - for entry in timeline_dir_entries { - let entry_path = entry.context("Failed to list timeline dir entry")?.path(); - if entry_path.is_file() { - if entry_path.file_name().and_then(ffi::OsStr::to_str) == Some(METADATA_FILE_NAME) { - timeline_metadata_path = Some(entry_path); - } else if is_ephemeral_file(&entry_path.file_name().unwrap().to_string_lossy()) { - debug!("skipping ephemeral file {}", entry_path.display()); - continue; - } else if entry_path.extension().and_then(ffi::OsStr::to_str) - == Some(TEMP_DOWNLOAD_EXTENSION) - { - info!("removing temp download file at {}", entry_path.display()); - fs::remove_file(&entry_path).with_context(|| { - format!( - "failed to remove temp download file at {}", - entry_path.display() - ) - })?; - } else { - timeline_files.insert(entry_path); - } - } - } - - // FIXME (rodionov) if attach call succeeded, and then pageserver is restarted before download is completed - // then attach is lost. There would be no retries for that, - // initial collect will fail because there is no metadata. - // We either need to start download if we see empty dir after restart or attach caller should - // be aware of that and retry attach if awaits_download for timeline switched from true to false - // but timelinne didnt appear locally. - // Check what happens with remote index in that case. - let timeline_metadata_path = match timeline_metadata_path { - Some(path) => path, - None => bail!("No metadata file found in the timeline directory"), - }; - let metadata = TimelineMetadata::from_bytes( - &fs::read(&timeline_metadata_path).context("Failed to read timeline metadata file")?, - ) - .context("Failed to parse timeline metadata file bytes")?; - - Ok((timeline_id, metadata, timeline_files)) -} - -/// Storage (potentially remote) API to manage its state. -/// This storage tries to be unaware of any layered repository context, -/// providing basic CRUD operations for storage files. -#[async_trait::async_trait] -pub trait RemoteStorage: Send + Sync { - /// A way to uniquely reference a file in the remote storage. - type StoragePath; - - /// Attempts to derive the storage path out of the local path, if the latter is correct. - fn storage_path(&self, local_path: &Path) -> anyhow::Result; - - /// Gets the download path of the given storage file. - fn local_path(&self, storage_path: &Self::StoragePath) -> anyhow::Result; - - /// Lists all items the storage has right now. - async fn list(&self) -> anyhow::Result>; - - /// Streams the local file contents into remote into the remote storage entry. - async fn upload( - &self, - from: impl io::AsyncRead + Unpin + Send + Sync + 'static, - // S3 PUT request requires the content length to be specified, - // otherwise it starts to fail with the concurrent connection count increasing. - from_size_bytes: usize, - to: &Self::StoragePath, - metadata: Option, - ) -> anyhow::Result<()>; - - /// Streams the remote storage entry contents into the buffered writer given, returns the filled writer. - /// Returns the metadata, if any was stored with the file previously. - async fn download( - &self, - from: &Self::StoragePath, - to: &mut (impl io::AsyncWrite + Unpin + Send + Sync), - ) -> anyhow::Result>; - - /// Streams a given byte range of the remote storage entry contents into the buffered writer given, returns the filled writer. - /// Returns the metadata, if any was stored with the file previously. - async fn download_range( - &self, - from: &Self::StoragePath, - start_inclusive: u64, - end_exclusive: Option, - to: &mut (impl io::AsyncWrite + Unpin + Send + Sync), - ) -> anyhow::Result>; - - async fn delete(&self, path: &Self::StoragePath) -> anyhow::Result<()>; -} - -/// Extra set of key-value pairs that contain arbitrary metadata about the storage entry. -/// Immutable, cannot be changed once the file is created. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct StorageMetadata(HashMap); - -fn strip_path_prefix<'a>(prefix: &'a Path, path: &'a Path) -> anyhow::Result<&'a Path> { - if prefix == path { - anyhow::bail!( - "Prefix and the path are equal, cannot strip: '{}'", - prefix.display() - ) - } else { - path.strip_prefix(prefix).with_context(|| { - format!( - "Path '{}' is not prefixed with '{}'", - path.display(), - prefix.display(), - ) - }) - } -} diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 5044f2bfc5..d25dc8914d 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -1,5 +1,5 @@ use crate::layered_repository::metadata::TimelineMetadata; -use crate::remote_storage::RemoteIndex; +use crate::storage_sync::index::RemoteIndex; use crate::walrecord::ZenithWalRecord; use crate::CheckpointConfig; use anyhow::{bail, Result}; diff --git a/pageserver/src/remote_storage/storage_sync.rs b/pageserver/src/storage_sync.rs similarity index 77% rename from pageserver/src/remote_storage/storage_sync.rs rename to pageserver/src/storage_sync.rs index 8a26685a7d..bcc18e8ce4 100644 --- a/pageserver/src/remote_storage/storage_sync.rs +++ b/pageserver/src/storage_sync.rs @@ -1,3 +1,87 @@ +//! There are a few components the storage machinery consists of: +//! +//! * [`RemoteStorage`] that is used to interact with an arbitrary external storage +//! +//! * synchronization logic at [`storage_sync`] module that keeps pageserver state (both runtime one and the workdir files) and storage state in sync. +//! Synchronization internals are split into submodules +//! * [`storage_sync::index`] to keep track of remote tenant files, the metadata and their mappings to local files +//! * [`storage_sync::upload`] and [`storage_sync::download`] to manage archive creation and upload; download and extraction, respectively +//! +//! * public API via to interact with the external world: +//! * [`start_local_timeline_sync`] to launch a background async loop to handle the synchronization +//! * [`schedule_timeline_checkpoint_upload`] and [`schedule_timeline_download`] to enqueue a new upload and download tasks, +//! to be processed by the async loop +//! +//! Here's a schematic overview of all interactions backup and the rest of the pageserver perform: +//! +//! +------------------------+ +--------->-------+ +//! | | - - - (init async loop) - - - -> | | +//! | | | | +//! | | -------------------------------> | async | +//! | pageserver | (enqueue timeline sync task) | upload/download | +//! | | | loop | +//! | | <------------------------------- | | +//! | | (apply new timeline sync states) | | +//! +------------------------+ +---------<-------+ +//! | +//! | +//! CRUD layer file operations | +//! (upload/download/delete/list, etc.) | +//! V +//! +------------------------+ +//! | | +//! | [`RemoteStorage`] impl | +//! | | +//! | pageserver assumes it | +//! | owns exclusive write | +//! | access to this storage | +//! +------------------------+ +//! +//! First, during startup, the pageserver inits the storage sync thread with the async loop, or leaves the loop uninitialised, if configured so. +//! The loop inits the storage connection and checks the remote files stored. +//! This is done once at startup only, relying on the fact that pageserver uses the storage alone (ergo, nobody else uploads the files to the storage but this server). +//! Based on the remote storage data, the sync logic immediately schedules sync tasks for local timelines and reports about remote only timelines to pageserver, so it can +//! query their downloads later if they are accessed. +//! +//! Some time later, during pageserver checkpoints, in-memory data is flushed onto disk along with its metadata. +//! If the storage sync loop was successfully started before, pageserver schedules the new checkpoint file uploads after every checkpoint. +//! The checkpoint uploads are disabled, if no remote storage configuration is provided (no sync loop is started this way either). +//! See [`crate::layered_repository`] for the upload calls and the adjacent logic. +//! +//! Synchronization logic is able to communicate back with updated timeline sync states, [`crate::repository::TimelineSyncStatusUpdate`], +//! submitted via [`crate::tenant_mgr::apply_timeline_sync_status_updates`] function. Tenant manager applies corresponding timeline updates in pageserver's in-memory state. +//! Such submissions happen in two cases: +//! * once after the sync loop startup, to signal pageserver which timelines will be synchronized in the near future +//! * after every loop step, in case a timeline needs to be reloaded or evicted from pageserver's memory +//! +//! When the pageserver terminates, the sync loop finishes a current sync task (if any) and exits. +//! +//! The storage logic considers `image` as a set of local files (layers), fully representing a certain timeline at given moment (identified with `disk_consistent_lsn` from the corresponding `metadata` file). +//! Timeline can change its state, by adding more files on disk and advancing its `disk_consistent_lsn`: this happens after pageserver checkpointing and is followed +//! by the storage upload, if enabled. +//! Yet timeline cannot alter already existing files, and cannot remove those too: only a GC process is capable of removing unused files. +//! This way, remote storage synchronization relies on the fact that every checkpoint is incremental and local files are "immutable": +//! * when a certain checkpoint gets uploaded, the sync loop remembers the fact, preventing further reuploads of the same state +//! * no files are deleted from either local or remote storage, only the missing ones locally/remotely get downloaded/uploaded, local metadata file will be overwritten +//! when the newer image is downloaded +//! +//! Pageserver maintains similar to the local file structure remotely: all layer files are uploaded with the same names under the same directory structure. +//! Yet instead of keeping the `metadata` file remotely, we wrap it with more data in [`IndexShard`], containing the list of remote files. +//! This file gets read to populate the cache, if the remote timeline data is missing from it and gets updated after every successful download. +//! This way, we optimize S3 storage access by not running the `S3 list` command that could be expencive and slow: knowing both [`ZTenantId`] and [`ZTimelineId`], +//! we can always reconstruct the path to the timeline, use this to get the same path on the remote storage and retrive its shard contents, if needed, same as any layer files. +//! +//! By default, pageserver reads the remote storage index data only for timelines located locally, to synchronize those, if needed. +//! Bulk index data download happens only initially, on pageserer startup. The rest of the remote storage stays unknown to pageserver and loaded on demand only, +//! when a new timeline is scheduled for the download. +//! +//! NOTES: +//! * pageserver assumes it has exclusive write access to the remote storage. If supported, the way multiple pageservers can be separated in the same storage +//! (i.e. using different directories in the local filesystem external storage), but totally up to the storage implementation and not covered with the trait API. +//! +//! * the sync tasks may not processed immediately after the submission: if they error and get re-enqueued, their execution might be backed off to ensure error cap is not exceeded too fast. +//! The sync queue processing also happens in batches, so the sync tasks can wait in the queue for some time. +//! //! A synchronization logic for the [`RemoteStorage`] and pageserver in-memory state to ensure correct synchronizations //! between local tenant files and their counterparts from the remote storage. //! @@ -62,7 +146,6 @@ pub mod index; mod upload; use std::{ - borrow::Cow, collections::{HashMap, HashSet, VecDeque}, ffi::OsStr, fmt::Debug, @@ -75,6 +158,7 @@ use std::{ use anyhow::{bail, Context}; use futures::stream::{FuturesUnordered, StreamExt}; use lazy_static::lazy_static; +use remote_storage::{GenericRemoteStorage, RemoteStorage}; use tokio::{ fs, runtime::Runtime, @@ -85,17 +169,18 @@ use tracing::*; use self::{ download::{download_timeline_layers, DownloadedTimeline}, - index::{IndexPart, RemoteIndex, RemoteTimeline, RemoteTimelineIndex}, + index::{IndexPart, RemoteTimeline, RemoteTimelineIndex}, upload::{upload_index_part, upload_timeline_layers, UploadedTimeline}, }; -use super::{LocalTimelineInitStatus, LocalTimelineInitStatuses, RemoteStorage, SyncStartupData}; use crate::{ config::PageServerConf, layered_repository::{ - metadata::{metadata_path, TimelineMetadata}, + ephemeral_file::is_ephemeral_file, + metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME}, LayeredRepository, }, repository::TimelineSyncStatusUpdate, + storage_sync::{self, index::RemoteIndex}, tenant_mgr::apply_timeline_sync_status_updates, thread_mgr, thread_mgr::ThreadKind, @@ -134,6 +219,232 @@ lazy_static! { .expect("failed to register pageserver image sync time histogram vec"); } +/// A timeline status to share with pageserver's sync counterpart, +/// after comparing local and remote timeline state. +#[derive(Clone, Copy, Debug)] +pub enum LocalTimelineInitStatus { + /// The timeline has every remote layer present locally. + /// There could be some layers requiring uploading, + /// but this does not block the timeline from any user interaction. + LocallyComplete, + /// A timeline has some files remotely, that are not present locally and need downloading. + /// Downloading might update timeline's metadata locally and current pageserver logic deals with local layers only, + /// so the data needs to be downloaded first before the timeline can be used. + NeedsSync, +} + +type LocalTimelineInitStatuses = HashMap>; + +/// A structure to combine all synchronization data to share with pageserver after a successful sync loop initialization. +/// Successful initialization includes a case when sync loop is not started, in which case the startup data is returned still, +/// to simplify the received code. +pub struct SyncStartupData { + pub remote_index: RemoteIndex, + pub local_timeline_init_statuses: LocalTimelineInitStatuses, +} + +/// Based on the config, initiates the remote storage connection and starts a separate thread +/// that ensures that pageserver and the remote storage are in sync with each other. +/// If no external configuration connection given, no thread or storage initialization is done. +/// Along with that, scans tenant files local and remote (if the sync gets enabled) to check the initial timeline states. +pub fn start_local_timeline_sync( + config: &'static PageServerConf, +) -> anyhow::Result { + let local_timeline_files = local_tenant_timeline_files(config) + .context("Failed to collect local tenant timeline files")?; + + match config.remote_storage_config.as_ref() { + Some(storage_config) => { + match GenericRemoteStorage::new(config.workdir.clone(), storage_config) + .context("Failed to init the generic remote storage")? + { + GenericRemoteStorage::Local(local_fs_storage) => { + storage_sync::spawn_storage_sync_thread( + config, + local_timeline_files, + local_fs_storage, + storage_config.max_concurrent_syncs, + storage_config.max_sync_errors, + ) + } + GenericRemoteStorage::S3(s3_bucket_storage) => { + storage_sync::spawn_storage_sync_thread( + config, + local_timeline_files, + s3_bucket_storage, + storage_config.max_concurrent_syncs, + storage_config.max_sync_errors, + ) + } + } + .context("Failed to spawn the storage sync thread") + } + None => { + info!("No remote storage configured, skipping storage sync, considering all local timelines with correct metadata files enabled"); + let mut local_timeline_init_statuses = LocalTimelineInitStatuses::new(); + for ( + ZTenantTimelineId { + tenant_id, + timeline_id, + }, + _, + ) in local_timeline_files + { + local_timeline_init_statuses + .entry(tenant_id) + .or_default() + .insert(timeline_id, LocalTimelineInitStatus::LocallyComplete); + } + Ok(SyncStartupData { + local_timeline_init_statuses, + remote_index: RemoteIndex::empty(), + }) + } + } +} + +fn local_tenant_timeline_files( + config: &'static PageServerConf, +) -> anyhow::Result)>> { + let mut local_tenant_timeline_files = HashMap::new(); + let tenants_dir = config.tenants_path(); + for tenants_dir_entry in std::fs::read_dir(&tenants_dir) + .with_context(|| format!("Failed to list tenants dir {}", tenants_dir.display()))? + { + match &tenants_dir_entry { + Ok(tenants_dir_entry) => { + match collect_timelines_for_tenant(config, &tenants_dir_entry.path()) { + Ok(collected_files) => { + local_tenant_timeline_files.extend(collected_files.into_iter()) + } + Err(e) => error!( + "Failed to collect tenant files from dir '{}' for entry {:?}, reason: {:#}", + tenants_dir.display(), + tenants_dir_entry, + e + ), + } + } + Err(e) => error!( + "Failed to list tenants dir entry {:?} in directory {}, reason: {:?}", + tenants_dir_entry, + tenants_dir.display(), + e + ), + } + } + + Ok(local_tenant_timeline_files) +} + +fn collect_timelines_for_tenant( + config: &'static PageServerConf, + tenant_path: &Path, +) -> anyhow::Result)>> { + let mut timelines = HashMap::new(); + let tenant_id = tenant_path + .file_name() + .and_then(OsStr::to_str) + .unwrap_or_default() + .parse::() + .context("Could not parse tenant id out of the tenant dir name")?; + let timelines_dir = config.timelines_path(&tenant_id); + + for timelines_dir_entry in std::fs::read_dir(&timelines_dir).with_context(|| { + format!( + "Failed to list timelines dir entry for tenant {}", + tenant_id + ) + })? { + match timelines_dir_entry { + Ok(timelines_dir_entry) => { + let timeline_path = timelines_dir_entry.path(); + match collect_timeline_files(&timeline_path) { + Ok((timeline_id, metadata, timeline_files)) => { + timelines.insert( + ZTenantTimelineId { + tenant_id, + timeline_id, + }, + (metadata, timeline_files), + ); + } + Err(e) => error!( + "Failed to process timeline dir contents at '{}', reason: {:?}", + timeline_path.display(), + e + ), + } + } + Err(e) => error!( + "Failed to list timelines for entry tenant {}, reason: {:?}", + tenant_id, e + ), + } + } + + Ok(timelines) +} + +// discover timeline files and extract timeline metadata +// NOTE: ephemeral files are excluded from the list +fn collect_timeline_files( + timeline_dir: &Path, +) -> anyhow::Result<(ZTimelineId, TimelineMetadata, HashSet)> { + let mut timeline_files = HashSet::new(); + let mut timeline_metadata_path = None; + + let timeline_id = timeline_dir + .file_name() + .and_then(OsStr::to_str) + .unwrap_or_default() + .parse::() + .context("Could not parse timeline id out of the timeline dir name")?; + let timeline_dir_entries = + std::fs::read_dir(&timeline_dir).context("Failed to list timeline dir contents")?; + for entry in timeline_dir_entries { + let entry_path = entry.context("Failed to list timeline dir entry")?.path(); + if entry_path.is_file() { + if entry_path.file_name().and_then(OsStr::to_str) == Some(METADATA_FILE_NAME) { + timeline_metadata_path = Some(entry_path); + } else if is_ephemeral_file(&entry_path.file_name().unwrap().to_string_lossy()) { + debug!("skipping ephemeral file {}", entry_path.display()); + continue; + } else if entry_path.extension().and_then(OsStr::to_str) + == Some(TEMP_DOWNLOAD_EXTENSION) + { + info!("removing temp download file at {}", entry_path.display()); + std::fs::remove_file(&entry_path).with_context(|| { + format!( + "failed to remove temp download file at {}", + entry_path.display() + ) + })?; + } else { + timeline_files.insert(entry_path); + } + } + } + + // FIXME (rodionov) if attach call succeeded, and then pageserver is restarted before download is completed + // then attach is lost. There would be no retries for that, + // initial collect will fail because there is no metadata. + // We either need to start download if we see empty dir after restart or attach caller should + // be aware of that and retry attach if awaits_download for timeline switched from true to false + // but timelinne didnt appear locally. + // Check what happens with remote index in that case. + let timeline_metadata_path = match timeline_metadata_path { + Some(path) => path, + None => bail!("No metadata file found in the timeline directory"), + }; + let metadata = TimelineMetadata::from_bytes( + &std::fs::read(&timeline_metadata_path).context("Failed to read timeline metadata file")?, + ) + .context("Failed to parse timeline metadata file bytes")?; + + Ok((timeline_id, metadata, timeline_files)) +} + /// Wraps mpsc channel bits around into a queue interface. /// mpsc approach was picked to allow blocking the sync loop if no tasks are present, to avoid meaningless spinning. mod sync_queue { @@ -505,7 +816,7 @@ pub(super) fn spawn_storage_sync_thread( ) -> anyhow::Result where P: Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, { let (sender, receiver) = mpsc::unbounded_channel(); sync_queue::init(sender)?; @@ -566,7 +877,7 @@ fn storage_sync_loop( max_sync_errors: NonZeroU32, ) where P: Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, { info!("Starting remote storage sync loop"); loop { @@ -618,7 +929,7 @@ async fn loop_step( ) -> ControlFlow<(), HashMap>> where P: Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, { let batched_tasks = match sync_queue::next_task_batch(receiver, max_concurrent_timelines_sync).await { @@ -677,7 +988,7 @@ async fn process_sync_task( ) -> Option where P: Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, { let sync_start = Instant::now(); let current_remote_timeline = { index.read().await.timeline_entry(&sync_id).cloned() }; @@ -810,7 +1121,7 @@ async fn download_timeline( ) -> Option where P: Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, { match download_timeline_layers( conf, @@ -936,7 +1247,7 @@ async fn upload_timeline( task_name: &str, ) where P: Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, { let mut uploaded_data = match upload_timeline_layers(storage, current_remote_timeline, sync_id, new_upload_data) @@ -991,7 +1302,7 @@ async fn update_remote_data( ) -> anyhow::Result<()> where P: Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, { info!("Updating remote index for the timeline"); let updated_remote_timeline = { @@ -1101,7 +1412,7 @@ async fn try_fetch_index_parts( ) -> HashMap where P: Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, { let mut index_parts = HashMap::with_capacity(keys.len()); @@ -1246,20 +1557,6 @@ fn register_sync_status(sync_start: Instant, sync_name: &str, sync_status: Optio .observe(secs_elapsed) } -pub fn path_with_suffix_extension(original_path: impl AsRef, suffix: &str) -> PathBuf { - let new_extension = match original_path - .as_ref() - .extension() - .map(OsStr::to_string_lossy) - { - Some(extension) => Cow::Owned(format!("{extension}.{suffix}")), - None => Cow::Borrowed(suffix), - }; - original_path - .as_ref() - .with_extension(new_extension.as_ref()) -} - #[cfg(test)] mod test_utils { use utils::lsn::Lsn; @@ -1671,28 +1968,4 @@ mod tests { "Merged upload tasks should have a metadata with biggest disk_consistent_lsn" ); } - - #[test] - fn test_path_with_suffix_extension() { - let p = PathBuf::from("/foo/bar"); - assert_eq!( - &path_with_suffix_extension(&p, "temp").to_string_lossy(), - "/foo/bar.temp" - ); - let p = PathBuf::from("/foo/bar"); - assert_eq!( - &path_with_suffix_extension(&p, "temp.temp").to_string_lossy(), - "/foo/bar.temp.temp" - ); - let p = PathBuf::from("/foo/bar.baz"); - assert_eq!( - &path_with_suffix_extension(&p, "temp.temp").to_string_lossy(), - "/foo/bar.baz.temp.temp" - ); - let p = PathBuf::from("/foo/bar.baz"); - assert_eq!( - &path_with_suffix_extension(&p, ".temp").to_string_lossy(), - "/foo/bar.baz..temp" - ); - } } diff --git a/pageserver/src/remote_storage/storage_sync/download.rs b/pageserver/src/storage_sync/download.rs similarity index 93% rename from pageserver/src/remote_storage/storage_sync/download.rs rename to pageserver/src/storage_sync/download.rs index 7e2496b796..dca08bca5d 100644 --- a/pageserver/src/remote_storage/storage_sync/download.rs +++ b/pageserver/src/storage_sync/download.rs @@ -4,6 +4,7 @@ use std::{collections::HashSet, fmt::Debug, path::Path}; use anyhow::Context; use futures::stream::{FuturesUnordered, StreamExt}; +use remote_storage::{path_with_suffix_extension, RemoteStorage}; use tokio::{ fs, io::{self, AsyncWriteExt}, @@ -13,10 +14,7 @@ use tracing::{debug, error, info, warn}; use crate::{ config::PageServerConf, layered_repository::metadata::metadata_path, - remote_storage::{ - storage_sync::{path_with_suffix_extension, sync_queue, SyncTask}, - RemoteStorage, - }, + storage_sync::{sync_queue, SyncTask}, }; use utils::zid::ZTenantTimelineId; @@ -35,17 +33,19 @@ pub async fn download_index_part( ) -> anyhow::Result where P: Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, { let index_part_path = metadata_path(conf, sync_id.timeline_id, sync_id.tenant_id) .with_file_name(IndexPart::FILE_NAME) .with_extension(IndexPart::FILE_EXTENSION); - let part_storage_path = storage.storage_path(&index_part_path).with_context(|| { - format!( - "Failed to get the index part storage path for local path '{}'", - index_part_path.display() - ) - })?; + let part_storage_path = storage + .remote_object_id(&index_part_path) + .with_context(|| { + format!( + "Failed to get the index part storage path for local path '{}'", + index_part_path.display() + ) + })?; let mut index_part_bytes = Vec::new(); storage .download(&part_storage_path, &mut index_part_bytes) @@ -93,7 +93,7 @@ pub(super) async fn download_timeline_layers<'a, P, S>( ) -> DownloadedTimeline where P: Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, { let remote_timeline = match remote_timeline { Some(remote_timeline) => { @@ -130,7 +130,7 @@ where ); } else { let layer_storage_path = storage - .storage_path(&layer_desination_path) + .remote_object_id(&layer_desination_path) .with_context(|| { format!( "Failed to get the layer storage path for local path '{}'", @@ -262,18 +262,16 @@ async fn fsync_path(path: impl AsRef) -> Result<(), io::Error> { mod tests { use std::collections::{BTreeSet, HashSet}; + use remote_storage::{LocalFs, RemoteStorage}; use tempfile::tempdir; use utils::lsn::Lsn; use crate::{ - remote_storage::{ - storage_sync::{ - index::RelativePath, - test_utils::{create_local_timeline, dummy_metadata}, - }, - LocalFs, - }, repository::repo_harness::{RepoHarness, TIMELINE_ID}, + storage_sync::{ + index::RelativePath, + test_utils::{create_local_timeline, dummy_metadata}, + }, }; use super::*; @@ -283,7 +281,10 @@ mod tests { let harness = RepoHarness::create("download_timeline")?; let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); let layer_files = ["a", "b", "layer_to_skip", "layer_to_keep_locally"]; - let storage = LocalFs::new(tempdir()?.path().to_path_buf(), &harness.conf.workdir)?; + let storage = LocalFs::new( + tempdir()?.path().to_path_buf(), + harness.conf.workdir.clone(), + )?; let current_retries = 3; let metadata = dummy_metadata(Lsn(0x30)); let local_timeline_path = harness.timeline_path(&TIMELINE_ID); @@ -291,7 +292,7 @@ mod tests { create_local_timeline(&harness, TIMELINE_ID, &layer_files, metadata.clone()).await?; for local_path in timeline_upload.layers_to_upload { - let remote_path = storage.storage_path(&local_path)?; + let remote_path = storage.remote_object_id(&local_path)?; let remote_parent_dir = remote_path.parent().unwrap(); if !remote_parent_dir.exists() { fs::create_dir_all(&remote_parent_dir).await?; @@ -375,7 +376,7 @@ mod tests { async fn download_timeline_negatives() -> anyhow::Result<()> { let harness = RepoHarness::create("download_timeline_negatives")?; let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); - let storage = LocalFs::new(tempdir()?.path().to_owned(), &harness.conf.workdir)?; + let storage = LocalFs::new(tempdir()?.path().to_owned(), harness.conf.workdir.clone())?; let empty_remote_timeline_download = download_timeline_layers( harness.conf, @@ -429,7 +430,10 @@ mod tests { let harness = RepoHarness::create("test_download_index_part")?; let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); - let storage = LocalFs::new(tempdir()?.path().to_path_buf(), &harness.conf.workdir)?; + let storage = LocalFs::new( + tempdir()?.path().to_path_buf(), + harness.conf.workdir.clone(), + )?; let metadata = dummy_metadata(Lsn(0x30)); let local_timeline_path = harness.timeline_path(&TIMELINE_ID); @@ -450,7 +454,7 @@ mod tests { metadata_path(harness.conf, sync_id.timeline_id, sync_id.tenant_id) .with_file_name(IndexPart::FILE_NAME) .with_extension(IndexPart::FILE_EXTENSION); - let storage_path = storage.storage_path(&local_index_part_path)?; + let storage_path = storage.remote_object_id(&local_index_part_path)?; fs::create_dir_all(storage_path.parent().unwrap()).await?; fs::write(&storage_path, serde_json::to_vec(&index_part)?).await?; diff --git a/pageserver/src/remote_storage/storage_sync/index.rs b/pageserver/src/storage_sync/index.rs similarity index 100% rename from pageserver/src/remote_storage/storage_sync/index.rs rename to pageserver/src/storage_sync/index.rs diff --git a/pageserver/src/remote_storage/storage_sync/upload.rs b/pageserver/src/storage_sync/upload.rs similarity index 93% rename from pageserver/src/remote_storage/storage_sync/upload.rs rename to pageserver/src/storage_sync/upload.rs index 91a0a0d6ce..55089df7bc 100644 --- a/pageserver/src/remote_storage/storage_sync/upload.rs +++ b/pageserver/src/storage_sync/upload.rs @@ -4,20 +4,21 @@ use std::{fmt::Debug, path::PathBuf}; use anyhow::Context; use futures::stream::{FuturesUnordered, StreamExt}; +use remote_storage::RemoteStorage; use tokio::fs; use tracing::{debug, error, info, warn}; use crate::{ config::PageServerConf, layered_repository::metadata::metadata_path, - remote_storage::{ - storage_sync::{index::RemoteTimeline, sync_queue, SyncTask}, - RemoteStorage, - }, + storage_sync::{sync_queue, SyncTask}, }; use utils::zid::ZTenantTimelineId; -use super::{index::IndexPart, SyncData, TimelineUpload}; +use super::{ + index::{IndexPart, RemoteTimeline}, + SyncData, TimelineUpload, +}; /// Serializes and uploads the given index part data to the remote storage. pub(super) async fn upload_index_part( @@ -28,7 +29,7 @@ pub(super) async fn upload_index_part( ) -> anyhow::Result<()> where P: Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, { let index_part_bytes = serde_json::to_vec(&index_part) .context("Failed to serialize index part file into bytes")?; @@ -38,12 +39,15 @@ where let index_part_path = metadata_path(conf, sync_id.timeline_id, sync_id.tenant_id) .with_file_name(IndexPart::FILE_NAME) .with_extension(IndexPart::FILE_EXTENSION); - let index_part_storage_path = storage.storage_path(&index_part_path).with_context(|| { - format!( - "Failed to get the index part storage path for local path '{}'", - index_part_path.display() - ) - })?; + let index_part_storage_path = + storage + .remote_object_id(&index_part_path) + .with_context(|| { + format!( + "Failed to get the index part storage path for local path '{}'", + index_part_path.display() + ) + })?; storage .upload( @@ -83,7 +87,7 @@ pub(super) async fn upload_timeline_layers<'a, P, S>( ) -> UploadedTimeline where P: Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, { let upload = &mut upload_data.data; let new_upload_lsn = upload @@ -112,7 +116,7 @@ where .into_iter() .map(|source_path| async move { let storage_path = storage - .storage_path(&source_path) + .remote_object_id(&source_path) .with_context(|| { format!( "Failed to get the layer storage path for local path '{}'", @@ -211,18 +215,16 @@ enum UploadError { mod tests { use std::collections::{BTreeSet, HashSet}; + use remote_storage::LocalFs; use tempfile::tempdir; use utils::lsn::Lsn; use crate::{ - remote_storage::{ - storage_sync::{ - index::RelativePath, - test_utils::{create_local_timeline, dummy_metadata}, - }, - LocalFs, - }, repository::repo_harness::{RepoHarness, TIMELINE_ID}, + storage_sync::{ + index::RelativePath, + test_utils::{create_local_timeline, dummy_metadata}, + }, }; use super::{upload_index_part, *}; @@ -233,7 +235,10 @@ mod tests { let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); let layer_files = ["a", "b"]; - let storage = LocalFs::new(tempdir()?.path().to_path_buf(), &harness.conf.workdir)?; + let storage = LocalFs::new( + tempdir()?.path().to_path_buf(), + harness.conf.workdir.clone(), + )?; let current_retries = 3; let metadata = dummy_metadata(Lsn(0x30)); let local_timeline_path = harness.timeline_path(&TIMELINE_ID); @@ -315,7 +320,7 @@ mod tests { let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); let layer_files = ["a1", "b1"]; - let storage = LocalFs::new(tempdir()?.path().to_owned(), &harness.conf.workdir)?; + let storage = LocalFs::new(tempdir()?.path().to_owned(), harness.conf.workdir.clone())?; let current_retries = 5; let metadata = dummy_metadata(Lsn(0x40)); @@ -403,7 +408,7 @@ mod tests { let harness = RepoHarness::create("test_upload_index_part")?; let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); - let storage = LocalFs::new(tempdir()?.path().to_owned(), &harness.conf.workdir)?; + let storage = LocalFs::new(tempdir()?.path().to_owned(), harness.conf.workdir.clone())?; let metadata = dummy_metadata(Lsn(0x40)); let local_timeline_path = harness.timeline_path(&TIMELINE_ID); diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index 507e749e8c..20a723b5b5 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -4,8 +4,9 @@ use crate::config::PageServerConf; use crate::layered_repository::LayeredRepository; use crate::pgdatadir_mapping::DatadirTimeline; -use crate::remote_storage::{self, LocalTimelineInitStatus, RemoteIndex, SyncStartupData}; use crate::repository::{Repository, TimelineSyncStatusUpdate}; +use crate::storage_sync::index::RemoteIndex; +use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData}; use crate::tenant_config::TenantConfOpt; use crate::thread_mgr; use crate::thread_mgr::ThreadKind; @@ -96,7 +97,7 @@ pub fn init_tenant_mgr(conf: &'static PageServerConf) -> anyhow::Result, + remote_storage: &S3Bucket, + listing: &HashSet, dir_path: &Path, conf: &SafeKeeperConf, ) -> anyhow::Result { @@ -55,17 +57,12 @@ async fn offload_files( && IsXLogFileName(entry.file_name().to_str().unwrap()) && entry.metadata().unwrap().created().unwrap() <= horizon { - let relpath = path.strip_prefix(&conf.workdir).unwrap(); - let s3path = String::from("walarchive/") + relpath.to_str().unwrap(); - if !listing.contains(&s3path) { + let remote_path = remote_storage.remote_object_id(path)?; + if !listing.contains(&remote_path) { let file = File::open(&path).await?; - client - .put_object(PutObjectRequest { - body: Some(StreamingBody::new(ReaderStream::new(file))), - bucket: bucket_name.to_string(), - key: s3path, - ..PutObjectRequest::default() - }) + let file_length = file.metadata().await?.len() as usize; + remote_storage + .upload(BufReader::new(file), file_length, &remote_path, None) .await?; fs::remove_file(&path).await?; @@ -77,58 +74,34 @@ async fn offload_files( } async fn main_loop(conf: &SafeKeeperConf) -> anyhow::Result<()> { - let region = Region::Custom { - name: env::var("S3_REGION").context("S3_REGION env var is not set")?, - endpoint: env::var("S3_ENDPOINT").context("S3_ENDPOINT env var is not set")?, + let remote_storage = match GenericRemoteStorage::new( + conf.workdir.clone(), + &RemoteStorageConfig { + max_concurrent_syncs: NonZeroUsize::new(10).unwrap(), + max_sync_errors: NonZeroU32::new(1).unwrap(), + storage: remote_storage::RemoteStorageKind::AwsS3(S3Config { + bucket_name: "zenith-testbucket".to_string(), + bucket_region: env::var("S3_REGION").context("S3_REGION env var is not set")?, + prefix_in_bucket: Some("walarchive/".to_string()), + endpoint: Some(env::var("S3_ENDPOINT").context("S3_ENDPOINT env var is not set")?), + concurrency_limit: NonZeroUsize::new(20).unwrap(), + }), + }, + )? { + GenericRemoteStorage::Local(_) => { + bail!("Unexpected: got local storage for the remote config") + } + GenericRemoteStorage::S3(remote_storage) => remote_storage, }; - let client = S3Client::new_with( - HttpClient::new().context("Failed to create S3 http client")?, - StaticProvider::new_minimal( - env::var("S3_ACCESSKEY").context("S3_ACCESSKEY env var is not set")?, - env::var("S3_SECRET").context("S3_SECRET env var is not set")?, - ), - region, - ); - - let bucket_name = "zenith-testbucket"; - loop { - let listing = gather_wal_entries(&client, bucket_name).await?; - let n = offload_files(&client, bucket_name, &listing, &conf.workdir, conf).await?; - info!("Offload {} files to S3", n); + let listing = remote_storage + .list() + .await? + .into_iter() + .collect::>(); + let n = offload_files(&remote_storage, &listing, &conf.workdir, conf).await?; + info!("Offload {n} files to S3"); sleep(conf.ttl.unwrap()).await; } } - -async fn gather_wal_entries( - client: &S3Client, - bucket_name: &str, -) -> anyhow::Result> { - let mut document_keys = HashSet::new(); - - let mut continuation_token = None::; - loop { - let response = client - .list_objects_v2(ListObjectsV2Request { - bucket: bucket_name.to_string(), - prefix: Some("walarchive/".to_string()), - continuation_token, - ..ListObjectsV2Request::default() - }) - .await?; - document_keys.extend( - response - .contents - .unwrap_or_default() - .into_iter() - .filter_map(|o| o.key), - ); - - continuation_token = response.continuation_token; - if continuation_token.is_none() { - break; - } - } - Ok(document_keys) -} diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 7acf0552df..3bb7c606d3 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -472,20 +472,16 @@ class ZenithEnvBuilder: mock_endpoint = self.s3_mock_server.endpoint() mock_region = self.s3_mock_server.region() - mock_access_key = self.s3_mock_server.access_key() - mock_secret_key = self.s3_mock_server.secret_key() boto3.client( 's3', endpoint_url=mock_endpoint, region_name=mock_region, - aws_access_key_id=mock_access_key, - aws_secret_access_key=mock_secret_key, + aws_access_key_id=self.s3_mock_server.access_key(), + aws_secret_access_key=self.s3_mock_server.secret_key(), ).create_bucket(Bucket=bucket_name) self.pageserver_remote_storage = S3Storage(bucket=bucket_name, endpoint=mock_endpoint, - region=mock_region, - access_key=mock_access_key, - secret_key=mock_secret_key) + region=mock_region) def __enter__(self): return self @@ -811,8 +807,6 @@ class LocalFsStorage: class S3Storage: bucket: str region: str - access_key: Optional[str] - secret_key: Optional[str] endpoint: Optional[str] @@ -998,7 +992,14 @@ class ZenithCli: append_pageserver_param_overrides(start_args, self.env.pageserver.remote_storage, self.env.pageserver.config_override) - return self.raw_cli(start_args) + + s3_env_vars = None + if self.env.s3_mock_server: + s3_env_vars = { + 'AWS_ACCESS_KEY_ID': self.env.s3_mock_server.access_key(), + 'AWS_SECRET_ACCESS_KEY': self.env.s3_mock_server.secret_key(), + } + return self.raw_cli(start_args, extra_env_vars=s3_env_vars) def pageserver_stop(self, immediate=False) -> 'subprocess.CompletedProcess[str]': cmd = ['pageserver', 'stop'] @@ -1093,6 +1094,7 @@ class ZenithCli: def raw_cli(self, arguments: List[str], + extra_env_vars: Optional[Dict[str, str]] = None, check_return_code=True) -> 'subprocess.CompletedProcess[str]': """ Run "zenith" with the specified arguments. @@ -1117,9 +1119,10 @@ class ZenithCli: env_vars = os.environ.copy() env_vars['ZENITH_REPO_DIR'] = str(self.env.repo_dir) env_vars['POSTGRES_DISTRIB_DIR'] = str(pg_distrib_dir) - if self.env.rust_log_override is not None: env_vars['RUST_LOG'] = self.env.rust_log_override + for (extra_env_key, extra_env_value) in (extra_env_vars or {}).items(): + env_vars[extra_env_key] = extra_env_value # Pass coverage settings var = 'LLVM_PROFILE_FILE' @@ -1217,10 +1220,6 @@ def append_pageserver_param_overrides( pageserver_storage_override = f"bucket_name='{pageserver_remote_storage.bucket}',\ bucket_region='{pageserver_remote_storage.region}'" - if pageserver_remote_storage.access_key is not None: - pageserver_storage_override += f",access_key_id='{pageserver_remote_storage.access_key}'" - if pageserver_remote_storage.secret_key is not None: - pageserver_storage_override += f",secret_access_key='{pageserver_remote_storage.secret_key}'" if pageserver_remote_storage.endpoint is not None: pageserver_storage_override += f",endpoint='{pageserver_remote_storage.endpoint}'" diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index 2bb22f2d3b..92877faef7 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -21,7 +21,13 @@ chrono = { version = "0.4", features = ["clock", "libc", "oldtime", "serde", "st clap = { version = "2", features = ["ansi_term", "atty", "color", "strsim", "suggestions", "vec_map"] } either = { version = "1", features = ["use_std"] } fail = { version = "0.5", default-features = false, features = ["failpoints"] } +futures-channel = { version = "0.3", features = ["alloc", "futures-sink", "sink", "std"] } +futures-task = { version = "0.3", default-features = false, features = ["alloc", "std"] } +futures-util = { version = "0.3", default-features = false, features = ["alloc", "async-await", "async-await-macro", "channel", "futures-channel", "futures-io", "futures-macro", "futures-sink", "io", "memchr", "sink", "slab", "std"] } +generic-array = { version = "0.14", default-features = false, features = ["more_lengths"] } hashbrown = { version = "0.11", features = ["ahash", "inline-more", "raw"] } +hex = { version = "0.4", features = ["alloc", "serde", "std"] } +hyper = { version = "0.14", features = ["client", "full", "h2", "http1", "http2", "runtime", "server", "socket2", "stream", "tcp"] } indexmap = { version = "1", default-features = false, features = ["std"] } itoa = { version = "0.4", features = ["i128", "std"] } libc = { version = "0.2", features = ["extra_traits", "std"] }