From 1d5abf1253b6e770fdcf15c37d17687b16afaf3e Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Mon, 6 Sep 2021 23:03:39 +0300 Subject: [PATCH] Initial version of the relish storage --- Cargo.lock | 2 + pageserver/Cargo.toml | 6 +- pageserver/src/bin/pageserver.rs | 101 ++++++++++- pageserver/src/layered_repository.rs | 43 +++-- .../src/layered_repository/delta_layer.rs | 36 ++-- .../src/layered_repository/image_layer.rs | 33 ++-- .../src/layered_repository/inmemory_layer.rs | 4 + .../src/layered_repository/storage_layer.rs | 4 + pageserver/src/lib.rs | 29 ++++ pageserver/src/relish_storage.rs | 54 ++++++ pageserver/src/relish_storage/local_fs.rs | 158 ++++++++++++++++++ pageserver/src/relish_storage/rust_s3.rs | 144 ++++++++++++++++ .../src/relish_storage/storage_uploader.rs | 116 +++++++++++++ pageserver/src/repository.rs | 1 - 14 files changed, 681 insertions(+), 50 deletions(-) create mode 100644 pageserver/src/relish_storage.rs create mode 100644 pageserver/src/relish_storage/local_fs.rs create mode 100644 pageserver/src/relish_storage/rust_s3.rs create mode 100644 pageserver/src/relish_storage/storage_uploader.rs diff --git a/Cargo.lock b/Cargo.lock index 749dcf43d0..6fb4545e50 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1173,6 +1173,7 @@ name = "pageserver" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "bookfile", "byteorder", "bytes", @@ -1193,6 +1194,7 @@ dependencies = [ "rand", "regex", "routerify", + "rust-s3", "scopeguard", "serde", "serde_json", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index edc9d7c4f6..c92c7d3bc2 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -4,8 +4,6 @@ version = "0.1.0" authors = ["Stas Kelvich "] edition = "2018" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] bookfile = "^0.3" chrono = "0.4.19" @@ -19,7 +17,7 @@ lazy_static = "1.4.0" log = "0.4.14" clap = "2.33.0" daemonize = "0.4.1" -tokio = { version = "1.11", features = ["process", "macros"] } +tokio = { version = "1.11", features = ["process", "macros", "fs"] } postgres-types = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } @@ -34,6 +32,8 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1" toml = "0.5" scopeguard = "1.1.0" +rust-s3 = { version = "0.27.0-rc4", features = ["no-verify-ssl"] } +async-trait = "0.1" postgres_ffi = { path = "../postgres_ffi" } zenith_metrics = { path = "../zenith_metrics" } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index adefc1b6b5..0a3dcf4fdb 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -15,11 +15,14 @@ use std::{ }; use zenith_utils::{auth::JwtAuth, logging, postgres_backend::AuthType}; -use anyhow::{ensure, Result}; +use anyhow::{bail, ensure, Result}; use clap::{App, Arg, ArgMatches}; use daemonize::Daemonize; -use pageserver::{branches, http, page_service, tenant_mgr, PageServerConf, LOG_FILE_NAME}; +use pageserver::{ + branches, http, page_service, tenant_mgr, PageServerConf, RelishStorageConfig, S3Config, + LOG_FILE_NAME, +}; use zenith_utils::http::endpoint; /// String arguments that can be declared via CLI or config file @@ -34,6 +37,23 @@ struct CfgFileParams { pg_distrib_dir: Option, auth_validation_public_key_path: Option, auth_type: Option, + // see https://github.com/alexcrichton/toml-rs/blob/6c162e6562c3e432bf04c82a3d1d789d80761a86/examples/enum_external.rs for enum deserialisation examples + relish_storage: Option, +} + +#[derive(Serialize, Deserialize, Clone)] +enum RelishStorage { + Local { + local_path: String, + }, + AwsS3 { + bucket_name: String, + bucket_region: String, + #[serde(skip_serializing)] + access_key_id: Option, + #[serde(skip_serializing)] + secret_access_key: Option, + }, } impl CfgFileParams { @@ -43,6 +63,21 @@ impl CfgFileParams { arg_matches.value_of(arg_name).map(str::to_owned) }; + let relish_storage = if let Some(local_path) = get_arg("relish-storage-local-path") { + Some(RelishStorage::Local { local_path }) + } else if let Some((bucket_name, bucket_region)) = + get_arg("relish-storage-s3-bucket").zip(get_arg("relish-storage-region")) + { + Some(RelishStorage::AwsS3 { + bucket_name, + bucket_region, + access_key_id: get_arg("relish-storage-access-key"), + secret_access_key: get_arg("relish-storage-secret-access-key"), + }) + } else { + None + }; + Self { listen_pg_addr: get_arg("listen-pg"), listen_http_addr: get_arg("listen-http"), @@ -53,6 +88,7 @@ impl CfgFileParams { pg_distrib_dir: get_arg("postgres-distrib"), auth_validation_public_key_path: get_arg("auth-validation-public-key-path"), auth_type: get_arg("auth-type"), + relish_storage, } } @@ -71,6 +107,7 @@ impl CfgFileParams { .auth_validation_public_key_path .or(other.auth_validation_public_key_path), auth_type: self.auth_type.or(other.auth_type), + relish_storage: self.relish_storage.or(other.relish_storage), } } @@ -124,7 +161,7 @@ impl CfgFileParams { })?; if !pg_distrib_dir.join("bin/postgres").exists() { - anyhow::bail!("Can't find postgres binary at {:?}", pg_distrib_dir); + bail!("Can't find postgres binary at {:?}", pg_distrib_dir); } if auth_type == AuthType::ZenithJWT { @@ -139,6 +176,26 @@ impl CfgFileParams { ); } + let relish_storage_config = + self.relish_storage + .as_ref() + .map(|storage_params| match storage_params.clone() { + RelishStorage::Local { local_path } => { + RelishStorageConfig::LocalFs(PathBuf::from(local_path)) + } + RelishStorage::AwsS3 { + bucket_name, + bucket_region, + access_key_id, + secret_access_key, + } => RelishStorageConfig::AwsS3(S3Config { + bucket_name, + bucket_region, + access_key_id, + secret_access_key, + }), + }); + Ok(PageServerConf { daemonize: false, @@ -157,6 +214,7 @@ impl CfgFileParams { auth_validation_public_key_path, auth_type, + relish_storage_config, }) } } @@ -248,6 +306,43 @@ fn main() -> Result<()> { .takes_value(true) .help("Authentication scheme type. One of: Trust, MD5, ZenithJWT"), ) + .arg( + Arg::with_name("relish-storage-local-path") + .long("relish-storage-local-path") + .takes_value(true) + .help("Path to the local directory, to be used as an external relish storage") + .conflicts_with_all(&[ + "relish-storage-s3-bucket", + "relish-storage-region", + "relish-storage-access-key", + "relish-storage-secret-access-key", + ]), + ) + .arg( + Arg::with_name("relish-storage-s3-bucket") + .long("relish-storage-s3-bucket") + .takes_value(true) + .help("Name of the AWS S3 bucket to use an external relish storage") + .requires("relish-storage-region"), + ) + .arg( + Arg::with_name("relish-storage-region") + .long("relish-storage-region") + .takes_value(true) + .help("Region of the AWS S3 bucket"), + ) + .arg( + Arg::with_name("relish-storage-access-key") + .long("relish-storage-access-key") + .takes_value(true) + .help("Credentials to access the AWS S3 bucket"), + ) + .arg( + Arg::with_name("relish-storage-secret-access-key") + .long("relish-storage-secret-access-key") + .takes_value(true) + .help("Credentials to access the AWS S3 bucket"), + ) .get_matches(); let workdir = Path::new(arg_matches.value_of("workdir").unwrap_or(".zenith")); diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index baba96948a..3634ce2e7f 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -24,7 +24,7 @@ use std::collections::{BTreeSet, HashSet}; use std::fs::File; use std::io::Write; use std::ops::Bound::Included; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, MutexGuard}; @@ -33,6 +33,7 @@ use std::{fs, thread}; use crate::layered_repository::inmemory_layer::FreezeLayers; use crate::relish::*; +use crate::relish_storage::storage_uploader::QueueBasedRelishUploader; use crate::repository::{GcResult, Repository, Timeline, WALRecord}; use crate::restore_local_repo::import_timeline_wal; use crate::walredo::WalRedoManager; @@ -109,6 +110,7 @@ pub struct LayeredRepository { timelines: Mutex>>, walredo_mgr: Arc, + relish_uploader: Option>, } /// Public interface @@ -139,7 +141,8 @@ impl Repository for LayeredRepository { None, timelineid, self.tenantid, - self.walredo_mgr.clone(), + Arc::clone(&self.walredo_mgr), + self.relish_uploader.as_ref().map(Arc::clone), 0, )?; @@ -230,7 +233,8 @@ impl LayeredRepository { ancestor, timelineid, self.tenantid, - self.walredo_mgr.clone(), + Arc::clone(&self.walredo_mgr), + self.relish_uploader.as_ref().map(Arc::clone), 0, // init with 0 and update after layers are loaded )?; @@ -275,6 +279,9 @@ impl LayeredRepository { conf, timelines: Mutex::new(HashMap::new()), walredo_mgr, + relish_uploader: conf.relish_storage_config.as_ref().map(|config| { + Arc::new(QueueBasedRelishUploader::new(config, &conf.workdir).unwrap()) + }), } } @@ -349,7 +356,7 @@ impl LayeredRepository { timelineid: ZTimelineId, tenantid: ZTenantId, data: &TimelineMetadata, - ) -> Result<()> { + ) -> Result { let path = conf.timeline_path(&timelineid, &tenantid).join("metadata"); let mut file = File::create(&path)?; @@ -357,7 +364,7 @@ impl LayeredRepository { file.write_all(&TimelineMetadata::ser(data)?)?; - Ok(()) + Ok(path) } fn load_metadata( @@ -536,6 +543,8 @@ pub struct LayeredTimeline { // WAL redo manager walredo_mgr: Arc, + relish_uploader: Option>, + // What page versions do we hold in the repository? If we get a // request > last_record_lsn, we need to wait until we receive all // the WAL up to the request. The SeqWait provides functions for @@ -943,6 +952,7 @@ impl LayeredTimeline { /// Open a Timeline handle. /// /// Loads the metadata for the timeline into memory, but not the layer map. + #[allow(clippy::too_many_arguments)] fn new( conf: &'static PageServerConf, metadata: TimelineMetadata, @@ -950,6 +960,7 @@ impl LayeredTimeline { timelineid: ZTimelineId, tenantid: ZTenantId, walredo_mgr: Arc, + relish_uploader: Option>, current_logical_size: usize, ) -> Result { let timeline = LayeredTimeline { @@ -959,6 +970,7 @@ impl LayeredTimeline { layers: Mutex::new(LayerMap::default()), walredo_mgr, + relish_uploader, // initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'. last_record_lsn: SeqWait::new(RecordLsn { @@ -1317,6 +1329,11 @@ impl LayeredTimeline { drop(layers); let new_historics = frozen.write_to_disk(self)?; layers = self.layers.lock().unwrap(); + if let Some(relish_uploader) = &self.relish_uploader { + for label_path in new_historics.iter().filter_map(|layer| layer.path()) { + relish_uploader.schedule_upload(self.timelineid, label_path); + } + } // Finally, replace the frozen in-memory layer with the new on-disk layers layers.remove_historic(frozen.as_ref()); @@ -1353,12 +1370,10 @@ impl LayeredTimeline { // don't remember what the correct value that corresponds to some old // LSN is. But if we flush everything, then the value corresponding // current 'last_record_lsn' is correct and we can store it on disk. - let ondisk_prev_record_lsn = { - if disk_consistent_lsn == last_record_lsn { - Some(prev_record_lsn) - } else { - None - } + let ondisk_prev_record_lsn = if disk_consistent_lsn == last_record_lsn { + Some(prev_record_lsn) + } else { + None }; let ancestor_timelineid = self.ancestor_timeline.as_ref().map(|x| x.timelineid); @@ -1369,7 +1384,11 @@ impl LayeredTimeline { ancestor_timeline: ancestor_timelineid, ancestor_lsn: self.ancestor_lsn, }; - LayeredRepository::save_metadata(self.conf, self.timelineid, self.tenantid, &metadata)?; + let metadata_path = + LayeredRepository::save_metadata(self.conf, self.timelineid, self.tenantid, &metadata)?; + if let Some(relish_uploader) = &self.relish_uploader { + relish_uploader.schedule_upload(self.timelineid, metadata_path); + } // Also update the in-memory copy self.disk_consistent_lsn.store(disk_consistent_lsn); diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index 083fa3e0d0..03f9b0350f 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -161,6 +161,20 @@ impl Layer for DeltaLayer { ) } + fn path(&self) -> Option { + Some(Self::path_for( + &self.path_or_conf, + self.timelineid, + self.tenantid, + &DeltaFileName { + seg: self.seg, + start_lsn: self.start_lsn, + end_lsn: self.end_lsn, + dropped: self.dropped, + }, + )) + } + /// Look up given page in the cache. fn get_page_reconstruct_data( &self, @@ -271,7 +285,9 @@ impl Layer for DeltaLayer { fn delete(&self) -> Result<()> { // delete underlying file - fs::remove_file(self.path())?; + if let Some(path) = self.path() { + fs::remove_file(path)?; + } Ok(()) } @@ -321,20 +337,6 @@ impl Layer for DeltaLayer { } impl DeltaLayer { - fn path(&self) -> PathBuf { - Self::path_for( - &self.path_or_conf, - self.timelineid, - self.tenantid, - &DeltaFileName { - seg: self.seg, - start_lsn: self.start_lsn, - end_lsn: self.end_lsn, - dropped: self.dropped, - }, - ) - } - fn path_for( path_or_conf: &PathOrConf, timelineid: ZTimelineId, @@ -386,7 +388,9 @@ impl DeltaLayer { let mut inner = delta_layer.inner.lock().unwrap(); // Write the in-memory btreemaps into a file - let path = delta_layer.path(); + let path = delta_layer + .path() + .expect("DeltaLayer is supposed to have a layer path on disk"); // Note: This overwrites any existing file. There shouldn't be any. // FIXME: throw an error instead? diff --git a/pageserver/src/layered_repository/image_layer.rs b/pageserver/src/layered_repository/image_layer.rs index 5397447482..4fdd7dc45c 100644 --- a/pageserver/src/layered_repository/image_layer.rs +++ b/pageserver/src/layered_repository/image_layer.rs @@ -97,6 +97,18 @@ impl Layer for ImageLayer { ) } + fn path(&self) -> Option { + Some(Self::path_for( + &self.path_or_conf, + self.timelineid, + self.tenantid, + &ImageFileName { + seg: self.seg, + lsn: self.lsn, + }, + )) + } + fn get_timeline_id(&self) -> ZTimelineId { self.timelineid } @@ -183,7 +195,9 @@ impl Layer for ImageLayer { fn delete(&self) -> Result<()> { // delete underlying file - fs::remove_file(self.path())?; + if let Some(path) = self.path() { + fs::remove_file(path)?; + } Ok(()) } @@ -214,18 +228,6 @@ impl Layer for ImageLayer { } impl ImageLayer { - fn path(&self) -> PathBuf { - Self::path_for( - &self.path_or_conf, - self.timelineid, - self.tenantid, - &ImageFileName { - seg: self.seg, - lsn: self.lsn, - }, - ) - } - fn path_for( path_or_conf: &PathOrConf, timelineid: ZTimelineId, @@ -271,8 +273,9 @@ impl ImageLayer { let inner = layer.inner.lock().unwrap(); // Write the images into a file - let path = layer.path(); - + let path = layer + .path() + .expect("ImageLayer is supposed to have a layer path on disk"); // Note: This overwrites any existing file. There shouldn't be any. // FIXME: throw an error instead? let file = File::create(&path)?; diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index 43ecf395e6..7b47a701ad 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -116,6 +116,10 @@ impl Layer for InMemoryLayer { PathBuf::from(format!("inmem-{}", delta_filename)) } + fn path(&self) -> Option { + None + } + fn get_timeline_id(&self) -> ZTimelineId { self.timelineid } diff --git a/pageserver/src/layered_repository/storage_layer.rs b/pageserver/src/layered_repository/storage_layer.rs index 2cb4850c10..48bc87e1c5 100644 --- a/pageserver/src/layered_repository/storage_layer.rs +++ b/pageserver/src/layered_repository/storage_layer.rs @@ -125,6 +125,10 @@ pub trait Layer: Send + Sync { /// Is the segment represented by this layer dropped by PostgreSQL? fn is_dropped(&self) -> bool; + /// Gets the physical location of the layer on disk. + /// Some layers, such as in-memory, might not have the location. + fn path(&self) -> Option; + /// Filename used to store this layer on disk. (Even in-memory layers /// implement this, to print a handy unique identifier for the layer for /// log messages, even though they're never not on disk.) diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index d2dcdc427f..32d2d66e6b 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -13,6 +13,7 @@ pub mod http; pub mod layered_repository; pub mod page_service; pub mod relish; +mod relish_storage; pub mod repository; pub mod restore_local_repo; pub mod tenant_mgr; @@ -79,6 +80,7 @@ pub struct PageServerConf { pub auth_type: AuthType, pub auth_validation_public_key_path: Option, + pub relish_storage_config: Option, } impl PageServerConf { @@ -158,6 +160,33 @@ impl PageServerConf { pg_distrib_dir: "".into(), auth_type: AuthType::Trust, auth_validation_public_key_path: None, + relish_storage_config: None, } } } + +/// External relish storage configuration, enough for creating a client for that storage. +#[derive(Debug, Clone)] +pub enum RelishStorageConfig { + /// Root folder to place all stored relish data into. + LocalFs(PathBuf), + AwsS3(S3Config), +} + +/// AWS S3 bucket coordinates and access credentials to manage the bucket contents (read and write). +#[derive(Clone)] +pub struct S3Config { + pub bucket_name: String, + pub bucket_region: String, + pub access_key_id: Option, + pub secret_access_key: Option, +} + +impl std::fmt::Debug for S3Config { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("S3Config") + .field("bucket_name", &self.bucket_name) + .field("bucket_region", &self.bucket_region) + .finish() + } +} diff --git a/pageserver/src/relish_storage.rs b/pageserver/src/relish_storage.rs new file mode 100644 index 0000000000..11e73711a3 --- /dev/null +++ b/pageserver/src/relish_storage.rs @@ -0,0 +1,54 @@ +//! Abstractions for the page server to store its relish layer data in the external storage. +//! +//! Main purpose of this module subtree is to provide a set of abstractions to manage the storage state +//! in a way, optimal for page server. +//! +//! The abstractions hide multiple custom external storage API implementations, +//! such as AWS S3, local filesystem, etc., located in the submodules. + +mod local_fs; +mod rust_s3; +/// A queue and the background machinery behind it to upload +/// local page server layer files to external storage. +pub mod storage_uploader; + +use std::path::Path; + +use anyhow::Context; + +/// Storage (potentially remote) API to manage its state. +#[async_trait::async_trait] +pub trait RelishStorage: Send + Sync { + type RelishStoragePath; + + fn derive_destination( + page_server_workdir: &Path, + relish_local_path: &Path, + ) -> anyhow::Result; + + async fn list_relishes(&self) -> anyhow::Result>; + + async fn download_relish( + &self, + from: &Self::RelishStoragePath, + to: &Path, + ) -> anyhow::Result<()>; + + async fn delete_relish(&self, path: &Self::RelishStoragePath) -> anyhow::Result<()>; + + async fn upload_relish(&self, from: &Path, to: &Self::RelishStoragePath) -> anyhow::Result<()>; +} + +fn strip_workspace_prefix<'a>( + page_server_workdir: &'a Path, + relish_local_path: &'a Path, +) -> anyhow::Result<&'a Path> { + relish_local_path + .strip_prefix(page_server_workdir) + .with_context(|| { + format!( + "Unexpected: relish local path '{}' is not relevant to server workdir", + relish_local_path.display(), + ) + }) +} diff --git a/pageserver/src/relish_storage/local_fs.rs b/pageserver/src/relish_storage/local_fs.rs new file mode 100644 index 0000000000..78ee858a5b --- /dev/null +++ b/pageserver/src/relish_storage/local_fs.rs @@ -0,0 +1,158 @@ +//! Local filesystem relish storage. +//! +//! Page server already stores layer data on the server, when freezing it. +//! This storage serves a way to +//! +//! * test things locally simply +//! * allow to compabre both binary sets +//! * help validating the relish storage API + +use std::{ + future::Future, + path::{Path, PathBuf}, + pin::Pin, +}; + +use anyhow::{bail, Context}; + +use super::{strip_workspace_prefix, RelishStorage}; + +pub struct LocalFs { + root: PathBuf, +} + +impl LocalFs { + /// Atetmpts to create local FS relish storage, also creates the directory provided, if not exists. + pub fn new(root: PathBuf) -> anyhow::Result { + if !root.exists() { + std::fs::create_dir_all(&root).with_context(|| { + format!( + "Failed to create all directories in the given root path {}", + root.display(), + ) + })?; + } + Ok(Self { root }) + } + + fn resolve_in_storage(&self, path: &Path) -> anyhow::Result { + if path.is_relative() { + Ok(self.root.join(path)) + } else if path.starts_with(&self.root) { + Ok(path.to_path_buf()) + } else { + bail!( + "Path '{}' does not belong to the current storage", + path.display() + ) + } + } +} + +#[async_trait::async_trait] +impl RelishStorage for LocalFs { + type RelishStoragePath = PathBuf; + + fn derive_destination( + page_server_workdir: &Path, + relish_local_path: &Path, + ) -> anyhow::Result { + Ok(strip_workspace_prefix(page_server_workdir, relish_local_path)?.to_path_buf()) + } + + async fn list_relishes(&self) -> anyhow::Result> { + Ok(get_all_files(&self.root).await?.into_iter().collect()) + } + + async fn download_relish( + &self, + from: &Self::RelishStoragePath, + to: &Path, + ) -> anyhow::Result<()> { + let file_path = self.resolve_in_storage(from)?; + if file_path.exists() && file_path.is_file() { + create_target_directory(to).await?; + tokio::fs::copy(file_path, to).await?; + Ok(()) + } else { + bail!( + "File '{}' either does not exist or is not a file", + file_path.display() + ) + } + } + + async fn delete_relish(&self, path: &Self::RelishStoragePath) -> anyhow::Result<()> { + let file_path = self.resolve_in_storage(path)?; + if file_path.exists() && file_path.is_file() { + Ok(tokio::fs::remove_file(file_path).await?) + } else { + bail!( + "File '{}' either does not exist or is not a file", + file_path.display() + ) + } + } + + async fn upload_relish(&self, from: &Path, to: &Self::RelishStoragePath) -> anyhow::Result<()> { + let target_file_path = self.resolve_in_storage(to)?; + create_target_directory(&target_file_path).await?; + + tokio::fs::copy(&from, &target_file_path) + .await + .with_context(|| { + format!( + "Failed to upload relish '{}' to local storage", + from.display(), + ) + })?; + Ok(()) + } +} + +fn get_all_files<'a, P>( + directory_path: P, +) -> Pin>> + Send + Sync + 'a>> +where + P: AsRef + Send + Sync + 'a, +{ + Box::pin(async move { + let directory_path = directory_path.as_ref(); + if directory_path.exists() { + if directory_path.is_dir() { + let mut paths = Vec::new(); + let mut dir_contents = tokio::fs::read_dir(directory_path).await?; + while let Some(dir_entry) = dir_contents.next_entry().await? { + let file_type = dir_entry.file_type().await?; + let entry_path = dir_entry.path(); + if file_type.is_symlink() { + log::debug!("{:?} us a symlink, skipping", entry_path) + } else if file_type.is_dir() { + paths.extend(get_all_files(entry_path).await?.into_iter()) + } else { + paths.push(dir_entry.path()); + } + } + Ok(paths) + } else { + bail!("Path '{}' is not a directory", directory_path.display()) + } + } else { + Ok(Vec::new()) + } + }) +} + +async fn create_target_directory(target_file_path: &Path) -> anyhow::Result<()> { + let target_dir = match target_file_path.parent() { + Some(parent_dir) => parent_dir, + None => bail!( + "Relish path '{}' has no parent directory", + target_file_path.display() + ), + }; + if !target_dir.exists() { + tokio::fs::create_dir_all(target_dir).await?; + } + Ok(()) +} diff --git a/pageserver/src/relish_storage/rust_s3.rs b/pageserver/src/relish_storage/rust_s3.rs new file mode 100644 index 0000000000..e98bf8949f --- /dev/null +++ b/pageserver/src/relish_storage/rust_s3.rs @@ -0,0 +1,144 @@ +//! A wrapper around AWS S3 client library `rust_s3` to be used a relish storage. + +use std::path::Path; + +use anyhow::Context; +use s3::{bucket::Bucket, creds::Credentials, region::Region}; + +use crate::{relish_storage::strip_workspace_prefix, S3Config}; + +use super::RelishStorage; + +const S3_FILE_SEPARATOR: char = '/'; + +#[derive(Debug)] +pub struct S3ObjectKey(String); + +impl S3ObjectKey { + fn key(&self) -> &str { + &self.0 + } +} + +/// AWS S3 relish storage. +pub struct RustS3 { + bucket: Bucket, +} + +impl RustS3 { + /// Creates the relish storage, errors if incorrect AWS S3 configuration provided. + pub fn new(aws_config: &S3Config) -> anyhow::Result { + let region = aws_config + .bucket_region + .parse::() + .context("Failed to parse the s3 region from config")?; + let credentials = Credentials::new( + aws_config.access_key_id.as_deref(), + aws_config.secret_access_key.as_deref(), + None, + None, + None, + ) + .context("Failed to create the s3 credentials")?; + Ok(Self { + bucket: Bucket::new_with_path_style( + aws_config.bucket_name.as_str(), + region, + credentials, + ) + .context("Failed to create the s3 bucket")?, + }) + } +} + +#[async_trait::async_trait] +impl RelishStorage for RustS3 { + type RelishStoragePath = S3ObjectKey; + + fn derive_destination( + page_server_workdir: &Path, + relish_local_path: &Path, + ) -> anyhow::Result { + let relative_path = strip_workspace_prefix(page_server_workdir, relish_local_path)?; + let mut key = String::new(); + for segment in relative_path { + key.push(S3_FILE_SEPARATOR); + key.push_str(&segment.to_string_lossy()); + } + Ok(S3ObjectKey(key)) + } + + async fn list_relishes(&self) -> anyhow::Result> { + let list_response = self + .bucket + .list(String::new(), None) + .await + .context("Failed to list s3 objects")?; + + Ok(list_response + .into_iter() + .flat_map(|response| response.contents) + .map(|s3_object| S3ObjectKey(s3_object.key)) + .collect()) + } + + async fn download_relish( + &self, + from: &Self::RelishStoragePath, + to: &Path, + ) -> anyhow::Result<()> { + let mut target_file = std::fs::OpenOptions::new() + .write(true) + .open(to) + .with_context(|| format!("Failed to open target s3 destination at {}", to.display()))?; + let code = self + .bucket + .get_object_stream(from.key(), &mut target_file) + .await + .with_context(|| format!("Failed to download s3 object with key {}", from.key()))?; + if code != 200 { + Err(anyhow::format_err!( + "Received non-200 exit code during downloading object from directory, code: {}", + code + )) + } else { + Ok(()) + } + } + + async fn delete_relish(&self, path: &Self::RelishStoragePath) -> anyhow::Result<()> { + let (_, code) = self + .bucket + .delete_object(path.key()) + .await + .with_context(|| format!("Failed to delete s3 object with key {}", path.key()))?; + if code != 200 { + Err(anyhow::format_err!( + "Received non-200 exit code during deleting object with key '{}', code: {}", + path.key(), + code + )) + } else { + Ok(()) + } + } + + async fn upload_relish(&self, from: &Path, to: &Self::RelishStoragePath) -> anyhow::Result<()> { + let mut local_file = tokio::fs::OpenOptions::new().read(true).open(from).await?; + + let code = self + .bucket + .put_object_stream(&mut local_file, to.key()) + .await + .with_context(|| format!("Failed to create s3 object with key {}", to.key()))?; + if code != 200 { + Err(anyhow::format_err!( + "Received non-200 exit code during creating object with key '{}', code: {}", + to.key(), + code + )) + } else { + Ok(()) + } + } +} diff --git a/pageserver/src/relish_storage/storage_uploader.rs b/pageserver/src/relish_storage/storage_uploader.rs new file mode 100644 index 0000000000..2d0889173d --- /dev/null +++ b/pageserver/src/relish_storage/storage_uploader.rs @@ -0,0 +1,116 @@ +use std::{ + collections::VecDeque, + path::{Path, PathBuf}, + sync::{Arc, Mutex}, + thread, +}; + +use zenith_utils::zid::ZTimelineId; + +use crate::{relish_storage::RelishStorage, RelishStorageConfig}; + +use super::{local_fs::LocalFs, rust_s3::RustS3}; + +pub struct QueueBasedRelishUploader { + upload_queue: Arc>>, +} + +impl QueueBasedRelishUploader { + pub fn new( + config: &RelishStorageConfig, + page_server_workdir: &'static Path, + ) -> anyhow::Result { + let upload_queue = Arc::new(Mutex::new(VecDeque::new())); + let _handle = match config { + RelishStorageConfig::LocalFs(root) => { + let relish_storage = LocalFs::new(root.clone())?; + create_upload_thread( + Arc::clone(&upload_queue), + relish_storage, + page_server_workdir, + )? + } + RelishStorageConfig::AwsS3(s3_config) => { + let relish_storage = RustS3::new(s3_config)?; + create_upload_thread( + Arc::clone(&upload_queue), + relish_storage, + page_server_workdir, + )? + } + }; + + Ok(Self { upload_queue }) + } + + pub fn schedule_upload(&self, timeline_id: ZTimelineId, relish_path: PathBuf) { + self.upload_queue + .lock() + .unwrap() + .push_back((timeline_id, relish_path)) + } +} + +fn create_upload_thread>( + upload_queue: Arc>>, + relish_storage: S, + page_server_workdir: &'static Path, +) -> std::io::Result> { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + thread::Builder::new() + .name("Queue based relish uploader".to_string()) + .spawn(move || loop { + runtime.block_on(async { + upload_loop_step(&upload_queue, &relish_storage, page_server_workdir).await; + }) + }) +} + +async fn upload_loop_step>( + upload_queue: &Mutex>, + relish_storage: &S, + page_server_workdir: &Path, +) { + let mut queue_accessor = upload_queue.lock().unwrap(); + log::debug!("current upload queue length: {}", queue_accessor.len()); + let next_upload = queue_accessor.pop_front(); + drop(queue_accessor); + + let (relish_timeline_id, relish_local_path) = match next_upload { + Some(data) => data, + None => { + // Don't spin and allow others to use the queue. + // In future, could be improved to be more clever about delays depending on relish upload stats + thread::sleep(std::time::Duration::from_secs(1)); + return; + } + }; + + if let Err(e) = upload_relish(relish_storage, page_server_workdir, &relish_local_path).await { + log::error!( + "Failed to upload relish '{}' for timeline {}, reason: {}", + relish_local_path.display(), + relish_timeline_id, + e + ); + upload_queue + .lock() + .unwrap() + .push_back((relish_timeline_id, relish_local_path)) + } else { + log::debug!("Relish successfully uploaded"); + } +} + +async fn upload_relish>( + relish_storage: &S, + page_server_workdir: &Path, + relish_local_path: &Path, +) -> anyhow::Result<()> { + let destination = S::derive_destination(page_server_workdir, relish_local_path)?; + relish_storage + .upload_relish(relish_local_path, &destination) + .await +} diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 40e141360b..f0dad8a91e 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -260,7 +260,6 @@ mod tests { fs::create_dir_all(&repo_dir.join("timelines"))?; let conf = PageServerConf::dummy_conf(repo_dir); - // Make a static copy of the config. This can never be free'd, but that's // OK in a test. let conf: &'static PageServerConf = Box::leak(Box::new(conf));