mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-25 09:00:37 +00:00
Initial version of the relish storage
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -1173,6 +1173,7 @@ name = "pageserver"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"bookfile",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
@@ -1193,6 +1194,7 @@ dependencies = [
|
||||
"rand",
|
||||
"regex",
|
||||
"routerify",
|
||||
"rust-s3",
|
||||
"scopeguard",
|
||||
"serde",
|
||||
"serde_json",
|
||||
|
||||
@@ -4,8 +4,6 @@ version = "0.1.0"
|
||||
authors = ["Stas Kelvich <stas@zenith.tech>"]
|
||||
edition = "2018"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
bookfile = "^0.3"
|
||||
chrono = "0.4.19"
|
||||
@@ -19,7 +17,7 @@ lazy_static = "1.4.0"
|
||||
log = "0.4.14"
|
||||
clap = "2.33.0"
|
||||
daemonize = "0.4.1"
|
||||
tokio = { version = "1.11", features = ["process", "macros"] }
|
||||
tokio = { version = "1.11", features = ["process", "macros", "fs"] }
|
||||
postgres-types = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" }
|
||||
postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" }
|
||||
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" }
|
||||
@@ -34,6 +32,8 @@ serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
toml = "0.5"
|
||||
scopeguard = "1.1.0"
|
||||
rust-s3 = { version = "0.27.0-rc4", features = ["no-verify-ssl"] }
|
||||
async-trait = "0.1"
|
||||
|
||||
postgres_ffi = { path = "../postgres_ffi" }
|
||||
zenith_metrics = { path = "../zenith_metrics" }
|
||||
|
||||
@@ -15,11 +15,14 @@ use std::{
|
||||
};
|
||||
use zenith_utils::{auth::JwtAuth, logging, postgres_backend::AuthType};
|
||||
|
||||
use anyhow::{ensure, Result};
|
||||
use anyhow::{bail, ensure, Result};
|
||||
use clap::{App, Arg, ArgMatches};
|
||||
use daemonize::Daemonize;
|
||||
|
||||
use pageserver::{branches, http, page_service, tenant_mgr, PageServerConf, LOG_FILE_NAME};
|
||||
use pageserver::{
|
||||
branches, http, page_service, tenant_mgr, PageServerConf, RelishStorageConfig, S3Config,
|
||||
LOG_FILE_NAME,
|
||||
};
|
||||
use zenith_utils::http::endpoint;
|
||||
|
||||
/// String arguments that can be declared via CLI or config file
|
||||
@@ -34,6 +37,23 @@ struct CfgFileParams {
|
||||
pg_distrib_dir: Option<String>,
|
||||
auth_validation_public_key_path: Option<String>,
|
||||
auth_type: Option<String>,
|
||||
// see https://github.com/alexcrichton/toml-rs/blob/6c162e6562c3e432bf04c82a3d1d789d80761a86/examples/enum_external.rs for enum deserialisation examples
|
||||
relish_storage: Option<RelishStorage>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
enum RelishStorage {
|
||||
Local {
|
||||
local_path: String,
|
||||
},
|
||||
AwsS3 {
|
||||
bucket_name: String,
|
||||
bucket_region: String,
|
||||
#[serde(skip_serializing)]
|
||||
access_key_id: Option<String>,
|
||||
#[serde(skip_serializing)]
|
||||
secret_access_key: Option<String>,
|
||||
},
|
||||
}
|
||||
|
||||
impl CfgFileParams {
|
||||
@@ -43,6 +63,21 @@ impl CfgFileParams {
|
||||
arg_matches.value_of(arg_name).map(str::to_owned)
|
||||
};
|
||||
|
||||
let relish_storage = if let Some(local_path) = get_arg("relish-storage-local-path") {
|
||||
Some(RelishStorage::Local { local_path })
|
||||
} else if let Some((bucket_name, bucket_region)) =
|
||||
get_arg("relish-storage-s3-bucket").zip(get_arg("relish-storage-region"))
|
||||
{
|
||||
Some(RelishStorage::AwsS3 {
|
||||
bucket_name,
|
||||
bucket_region,
|
||||
access_key_id: get_arg("relish-storage-access-key"),
|
||||
secret_access_key: get_arg("relish-storage-secret-access-key"),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Self {
|
||||
listen_pg_addr: get_arg("listen-pg"),
|
||||
listen_http_addr: get_arg("listen-http"),
|
||||
@@ -53,6 +88,7 @@ impl CfgFileParams {
|
||||
pg_distrib_dir: get_arg("postgres-distrib"),
|
||||
auth_validation_public_key_path: get_arg("auth-validation-public-key-path"),
|
||||
auth_type: get_arg("auth-type"),
|
||||
relish_storage,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -71,6 +107,7 @@ impl CfgFileParams {
|
||||
.auth_validation_public_key_path
|
||||
.or(other.auth_validation_public_key_path),
|
||||
auth_type: self.auth_type.or(other.auth_type),
|
||||
relish_storage: self.relish_storage.or(other.relish_storage),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -124,7 +161,7 @@ impl CfgFileParams {
|
||||
})?;
|
||||
|
||||
if !pg_distrib_dir.join("bin/postgres").exists() {
|
||||
anyhow::bail!("Can't find postgres binary at {:?}", pg_distrib_dir);
|
||||
bail!("Can't find postgres binary at {:?}", pg_distrib_dir);
|
||||
}
|
||||
|
||||
if auth_type == AuthType::ZenithJWT {
|
||||
@@ -139,6 +176,26 @@ impl CfgFileParams {
|
||||
);
|
||||
}
|
||||
|
||||
let relish_storage_config =
|
||||
self.relish_storage
|
||||
.as_ref()
|
||||
.map(|storage_params| match storage_params.clone() {
|
||||
RelishStorage::Local { local_path } => {
|
||||
RelishStorageConfig::LocalFs(PathBuf::from(local_path))
|
||||
}
|
||||
RelishStorage::AwsS3 {
|
||||
bucket_name,
|
||||
bucket_region,
|
||||
access_key_id,
|
||||
secret_access_key,
|
||||
} => RelishStorageConfig::AwsS3(S3Config {
|
||||
bucket_name,
|
||||
bucket_region,
|
||||
access_key_id,
|
||||
secret_access_key,
|
||||
}),
|
||||
});
|
||||
|
||||
Ok(PageServerConf {
|
||||
daemonize: false,
|
||||
|
||||
@@ -157,6 +214,7 @@ impl CfgFileParams {
|
||||
|
||||
auth_validation_public_key_path,
|
||||
auth_type,
|
||||
relish_storage_config,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -248,6 +306,43 @@ fn main() -> Result<()> {
|
||||
.takes_value(true)
|
||||
.help("Authentication scheme type. One of: Trust, MD5, ZenithJWT"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("relish-storage-local-path")
|
||||
.long("relish-storage-local-path")
|
||||
.takes_value(true)
|
||||
.help("Path to the local directory, to be used as an external relish storage")
|
||||
.conflicts_with_all(&[
|
||||
"relish-storage-s3-bucket",
|
||||
"relish-storage-region",
|
||||
"relish-storage-access-key",
|
||||
"relish-storage-secret-access-key",
|
||||
]),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("relish-storage-s3-bucket")
|
||||
.long("relish-storage-s3-bucket")
|
||||
.takes_value(true)
|
||||
.help("Name of the AWS S3 bucket to use an external relish storage")
|
||||
.requires("relish-storage-region"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("relish-storage-region")
|
||||
.long("relish-storage-region")
|
||||
.takes_value(true)
|
||||
.help("Region of the AWS S3 bucket"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("relish-storage-access-key")
|
||||
.long("relish-storage-access-key")
|
||||
.takes_value(true)
|
||||
.help("Credentials to access the AWS S3 bucket"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("relish-storage-secret-access-key")
|
||||
.long("relish-storage-secret-access-key")
|
||||
.takes_value(true)
|
||||
.help("Credentials to access the AWS S3 bucket"),
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
let workdir = Path::new(arg_matches.value_of("workdir").unwrap_or(".zenith"));
|
||||
|
||||
@@ -24,7 +24,7 @@ use std::collections::{BTreeSet, HashSet};
|
||||
use std::fs::File;
|
||||
use std::io::Write;
|
||||
use std::ops::Bound::Included;
|
||||
use std::path::Path;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
@@ -33,6 +33,7 @@ use std::{fs, thread};
|
||||
|
||||
use crate::layered_repository::inmemory_layer::FreezeLayers;
|
||||
use crate::relish::*;
|
||||
use crate::relish_storage::storage_uploader::QueueBasedRelishUploader;
|
||||
use crate::repository::{GcResult, Repository, Timeline, WALRecord};
|
||||
use crate::restore_local_repo::import_timeline_wal;
|
||||
use crate::walredo::WalRedoManager;
|
||||
@@ -109,6 +110,7 @@ pub struct LayeredRepository {
|
||||
timelines: Mutex<HashMap<ZTimelineId, Arc<LayeredTimeline>>>,
|
||||
|
||||
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
|
||||
relish_uploader: Option<Arc<QueueBasedRelishUploader>>,
|
||||
}
|
||||
|
||||
/// Public interface
|
||||
@@ -139,7 +141,8 @@ impl Repository for LayeredRepository {
|
||||
None,
|
||||
timelineid,
|
||||
self.tenantid,
|
||||
self.walredo_mgr.clone(),
|
||||
Arc::clone(&self.walredo_mgr),
|
||||
self.relish_uploader.as_ref().map(Arc::clone),
|
||||
0,
|
||||
)?;
|
||||
|
||||
@@ -230,7 +233,8 @@ impl LayeredRepository {
|
||||
ancestor,
|
||||
timelineid,
|
||||
self.tenantid,
|
||||
self.walredo_mgr.clone(),
|
||||
Arc::clone(&self.walredo_mgr),
|
||||
self.relish_uploader.as_ref().map(Arc::clone),
|
||||
0, // init with 0 and update after layers are loaded
|
||||
)?;
|
||||
|
||||
@@ -275,6 +279,9 @@ impl LayeredRepository {
|
||||
conf,
|
||||
timelines: Mutex::new(HashMap::new()),
|
||||
walredo_mgr,
|
||||
relish_uploader: conf.relish_storage_config.as_ref().map(|config| {
|
||||
Arc::new(QueueBasedRelishUploader::new(config, &conf.workdir).unwrap())
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -349,7 +356,7 @@ impl LayeredRepository {
|
||||
timelineid: ZTimelineId,
|
||||
tenantid: ZTenantId,
|
||||
data: &TimelineMetadata,
|
||||
) -> Result<()> {
|
||||
) -> Result<PathBuf> {
|
||||
let path = conf.timeline_path(&timelineid, &tenantid).join("metadata");
|
||||
let mut file = File::create(&path)?;
|
||||
|
||||
@@ -357,7 +364,7 @@ impl LayeredRepository {
|
||||
|
||||
file.write_all(&TimelineMetadata::ser(data)?)?;
|
||||
|
||||
Ok(())
|
||||
Ok(path)
|
||||
}
|
||||
|
||||
fn load_metadata(
|
||||
@@ -536,6 +543,8 @@ pub struct LayeredTimeline {
|
||||
// WAL redo manager
|
||||
walredo_mgr: Arc<dyn WalRedoManager + Sync + Send>,
|
||||
|
||||
relish_uploader: Option<Arc<QueueBasedRelishUploader>>,
|
||||
|
||||
// What page versions do we hold in the repository? If we get a
|
||||
// request > last_record_lsn, we need to wait until we receive all
|
||||
// the WAL up to the request. The SeqWait provides functions for
|
||||
@@ -943,6 +952,7 @@ impl LayeredTimeline {
|
||||
/// Open a Timeline handle.
|
||||
///
|
||||
/// Loads the metadata for the timeline into memory, but not the layer map.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn new(
|
||||
conf: &'static PageServerConf,
|
||||
metadata: TimelineMetadata,
|
||||
@@ -950,6 +960,7 @@ impl LayeredTimeline {
|
||||
timelineid: ZTimelineId,
|
||||
tenantid: ZTenantId,
|
||||
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
|
||||
relish_uploader: Option<Arc<QueueBasedRelishUploader>>,
|
||||
current_logical_size: usize,
|
||||
) -> Result<LayeredTimeline> {
|
||||
let timeline = LayeredTimeline {
|
||||
@@ -959,6 +970,7 @@ impl LayeredTimeline {
|
||||
layers: Mutex::new(LayerMap::default()),
|
||||
|
||||
walredo_mgr,
|
||||
relish_uploader,
|
||||
|
||||
// initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
|
||||
last_record_lsn: SeqWait::new(RecordLsn {
|
||||
@@ -1317,6 +1329,11 @@ impl LayeredTimeline {
|
||||
drop(layers);
|
||||
let new_historics = frozen.write_to_disk(self)?;
|
||||
layers = self.layers.lock().unwrap();
|
||||
if let Some(relish_uploader) = &self.relish_uploader {
|
||||
for label_path in new_historics.iter().filter_map(|layer| layer.path()) {
|
||||
relish_uploader.schedule_upload(self.timelineid, label_path);
|
||||
}
|
||||
}
|
||||
|
||||
// Finally, replace the frozen in-memory layer with the new on-disk layers
|
||||
layers.remove_historic(frozen.as_ref());
|
||||
@@ -1353,12 +1370,10 @@ impl LayeredTimeline {
|
||||
// don't remember what the correct value that corresponds to some old
|
||||
// LSN is. But if we flush everything, then the value corresponding
|
||||
// current 'last_record_lsn' is correct and we can store it on disk.
|
||||
let ondisk_prev_record_lsn = {
|
||||
if disk_consistent_lsn == last_record_lsn {
|
||||
Some(prev_record_lsn)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
let ondisk_prev_record_lsn = if disk_consistent_lsn == last_record_lsn {
|
||||
Some(prev_record_lsn)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let ancestor_timelineid = self.ancestor_timeline.as_ref().map(|x| x.timelineid);
|
||||
@@ -1369,7 +1384,11 @@ impl LayeredTimeline {
|
||||
ancestor_timeline: ancestor_timelineid,
|
||||
ancestor_lsn: self.ancestor_lsn,
|
||||
};
|
||||
LayeredRepository::save_metadata(self.conf, self.timelineid, self.tenantid, &metadata)?;
|
||||
let metadata_path =
|
||||
LayeredRepository::save_metadata(self.conf, self.timelineid, self.tenantid, &metadata)?;
|
||||
if let Some(relish_uploader) = &self.relish_uploader {
|
||||
relish_uploader.schedule_upload(self.timelineid, metadata_path);
|
||||
}
|
||||
|
||||
// Also update the in-memory copy
|
||||
self.disk_consistent_lsn.store(disk_consistent_lsn);
|
||||
|
||||
@@ -161,6 +161,20 @@ impl Layer for DeltaLayer {
|
||||
)
|
||||
}
|
||||
|
||||
fn path(&self) -> Option<PathBuf> {
|
||||
Some(Self::path_for(
|
||||
&self.path_or_conf,
|
||||
self.timelineid,
|
||||
self.tenantid,
|
||||
&DeltaFileName {
|
||||
seg: self.seg,
|
||||
start_lsn: self.start_lsn,
|
||||
end_lsn: self.end_lsn,
|
||||
dropped: self.dropped,
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
/// Look up given page in the cache.
|
||||
fn get_page_reconstruct_data(
|
||||
&self,
|
||||
@@ -271,7 +285,9 @@ impl Layer for DeltaLayer {
|
||||
|
||||
fn delete(&self) -> Result<()> {
|
||||
// delete underlying file
|
||||
fs::remove_file(self.path())?;
|
||||
if let Some(path) = self.path() {
|
||||
fs::remove_file(path)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -321,20 +337,6 @@ impl Layer for DeltaLayer {
|
||||
}
|
||||
|
||||
impl DeltaLayer {
|
||||
fn path(&self) -> PathBuf {
|
||||
Self::path_for(
|
||||
&self.path_or_conf,
|
||||
self.timelineid,
|
||||
self.tenantid,
|
||||
&DeltaFileName {
|
||||
seg: self.seg,
|
||||
start_lsn: self.start_lsn,
|
||||
end_lsn: self.end_lsn,
|
||||
dropped: self.dropped,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
fn path_for(
|
||||
path_or_conf: &PathOrConf,
|
||||
timelineid: ZTimelineId,
|
||||
@@ -386,7 +388,9 @@ impl DeltaLayer {
|
||||
let mut inner = delta_layer.inner.lock().unwrap();
|
||||
|
||||
// Write the in-memory btreemaps into a file
|
||||
let path = delta_layer.path();
|
||||
let path = delta_layer
|
||||
.path()
|
||||
.expect("DeltaLayer is supposed to have a layer path on disk");
|
||||
|
||||
// Note: This overwrites any existing file. There shouldn't be any.
|
||||
// FIXME: throw an error instead?
|
||||
|
||||
@@ -97,6 +97,18 @@ impl Layer for ImageLayer {
|
||||
)
|
||||
}
|
||||
|
||||
fn path(&self) -> Option<PathBuf> {
|
||||
Some(Self::path_for(
|
||||
&self.path_or_conf,
|
||||
self.timelineid,
|
||||
self.tenantid,
|
||||
&ImageFileName {
|
||||
seg: self.seg,
|
||||
lsn: self.lsn,
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
fn get_timeline_id(&self) -> ZTimelineId {
|
||||
self.timelineid
|
||||
}
|
||||
@@ -183,7 +195,9 @@ impl Layer for ImageLayer {
|
||||
|
||||
fn delete(&self) -> Result<()> {
|
||||
// delete underlying file
|
||||
fs::remove_file(self.path())?;
|
||||
if let Some(path) = self.path() {
|
||||
fs::remove_file(path)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -214,18 +228,6 @@ impl Layer for ImageLayer {
|
||||
}
|
||||
|
||||
impl ImageLayer {
|
||||
fn path(&self) -> PathBuf {
|
||||
Self::path_for(
|
||||
&self.path_or_conf,
|
||||
self.timelineid,
|
||||
self.tenantid,
|
||||
&ImageFileName {
|
||||
seg: self.seg,
|
||||
lsn: self.lsn,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
fn path_for(
|
||||
path_or_conf: &PathOrConf,
|
||||
timelineid: ZTimelineId,
|
||||
@@ -271,8 +273,9 @@ impl ImageLayer {
|
||||
let inner = layer.inner.lock().unwrap();
|
||||
|
||||
// Write the images into a file
|
||||
let path = layer.path();
|
||||
|
||||
let path = layer
|
||||
.path()
|
||||
.expect("ImageLayer is supposed to have a layer path on disk");
|
||||
// Note: This overwrites any existing file. There shouldn't be any.
|
||||
// FIXME: throw an error instead?
|
||||
let file = File::create(&path)?;
|
||||
|
||||
@@ -116,6 +116,10 @@ impl Layer for InMemoryLayer {
|
||||
PathBuf::from(format!("inmem-{}", delta_filename))
|
||||
}
|
||||
|
||||
fn path(&self) -> Option<PathBuf> {
|
||||
None
|
||||
}
|
||||
|
||||
fn get_timeline_id(&self) -> ZTimelineId {
|
||||
self.timelineid
|
||||
}
|
||||
|
||||
@@ -125,6 +125,10 @@ pub trait Layer: Send + Sync {
|
||||
/// Is the segment represented by this layer dropped by PostgreSQL?
|
||||
fn is_dropped(&self) -> bool;
|
||||
|
||||
/// Gets the physical location of the layer on disk.
|
||||
/// Some layers, such as in-memory, might not have the location.
|
||||
fn path(&self) -> Option<PathBuf>;
|
||||
|
||||
/// Filename used to store this layer on disk. (Even in-memory layers
|
||||
/// implement this, to print a handy unique identifier for the layer for
|
||||
/// log messages, even though they're never not on disk.)
|
||||
|
||||
@@ -13,6 +13,7 @@ pub mod http;
|
||||
pub mod layered_repository;
|
||||
pub mod page_service;
|
||||
pub mod relish;
|
||||
mod relish_storage;
|
||||
pub mod repository;
|
||||
pub mod restore_local_repo;
|
||||
pub mod tenant_mgr;
|
||||
@@ -79,6 +80,7 @@ pub struct PageServerConf {
|
||||
pub auth_type: AuthType,
|
||||
|
||||
pub auth_validation_public_key_path: Option<PathBuf>,
|
||||
pub relish_storage_config: Option<RelishStorageConfig>,
|
||||
}
|
||||
|
||||
impl PageServerConf {
|
||||
@@ -158,6 +160,33 @@ impl PageServerConf {
|
||||
pg_distrib_dir: "".into(),
|
||||
auth_type: AuthType::Trust,
|
||||
auth_validation_public_key_path: None,
|
||||
relish_storage_config: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// External relish storage configuration, enough for creating a client for that storage.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum RelishStorageConfig {
|
||||
/// Root folder to place all stored relish data into.
|
||||
LocalFs(PathBuf),
|
||||
AwsS3(S3Config),
|
||||
}
|
||||
|
||||
/// AWS S3 bucket coordinates and access credentials to manage the bucket contents (read and write).
|
||||
#[derive(Clone)]
|
||||
pub struct S3Config {
|
||||
pub bucket_name: String,
|
||||
pub bucket_region: String,
|
||||
pub access_key_id: Option<String>,
|
||||
pub secret_access_key: Option<String>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for S3Config {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("S3Config")
|
||||
.field("bucket_name", &self.bucket_name)
|
||||
.field("bucket_region", &self.bucket_region)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
54
pageserver/src/relish_storage.rs
Normal file
54
pageserver/src/relish_storage.rs
Normal file
@@ -0,0 +1,54 @@
|
||||
//! Abstractions for the page server to store its relish layer data in the external storage.
|
||||
//!
|
||||
//! Main purpose of this module subtree is to provide a set of abstractions to manage the storage state
|
||||
//! in a way, optimal for page server.
|
||||
//!
|
||||
//! The abstractions hide multiple custom external storage API implementations,
|
||||
//! such as AWS S3, local filesystem, etc., located in the submodules.
|
||||
|
||||
mod local_fs;
|
||||
mod rust_s3;
|
||||
/// A queue and the background machinery behind it to upload
|
||||
/// local page server layer files to external storage.
|
||||
pub mod storage_uploader;
|
||||
|
||||
use std::path::Path;
|
||||
|
||||
use anyhow::Context;
|
||||
|
||||
/// Storage (potentially remote) API to manage its state.
|
||||
#[async_trait::async_trait]
|
||||
pub trait RelishStorage: Send + Sync {
|
||||
type RelishStoragePath;
|
||||
|
||||
fn derive_destination(
|
||||
page_server_workdir: &Path,
|
||||
relish_local_path: &Path,
|
||||
) -> anyhow::Result<Self::RelishStoragePath>;
|
||||
|
||||
async fn list_relishes(&self) -> anyhow::Result<Vec<Self::RelishStoragePath>>;
|
||||
|
||||
async fn download_relish(
|
||||
&self,
|
||||
from: &Self::RelishStoragePath,
|
||||
to: &Path,
|
||||
) -> anyhow::Result<()>;
|
||||
|
||||
async fn delete_relish(&self, path: &Self::RelishStoragePath) -> anyhow::Result<()>;
|
||||
|
||||
async fn upload_relish(&self, from: &Path, to: &Self::RelishStoragePath) -> anyhow::Result<()>;
|
||||
}
|
||||
|
||||
fn strip_workspace_prefix<'a>(
|
||||
page_server_workdir: &'a Path,
|
||||
relish_local_path: &'a Path,
|
||||
) -> anyhow::Result<&'a Path> {
|
||||
relish_local_path
|
||||
.strip_prefix(page_server_workdir)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Unexpected: relish local path '{}' is not relevant to server workdir",
|
||||
relish_local_path.display(),
|
||||
)
|
||||
})
|
||||
}
|
||||
158
pageserver/src/relish_storage/local_fs.rs
Normal file
158
pageserver/src/relish_storage/local_fs.rs
Normal file
@@ -0,0 +1,158 @@
|
||||
//! Local filesystem relish storage.
|
||||
//!
|
||||
//! Page server already stores layer data on the server, when freezing it.
|
||||
//! This storage serves a way to
|
||||
//!
|
||||
//! * test things locally simply
|
||||
//! * allow to compabre both binary sets
|
||||
//! * help validating the relish storage API
|
||||
|
||||
use std::{
|
||||
future::Future,
|
||||
path::{Path, PathBuf},
|
||||
pin::Pin,
|
||||
};
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
|
||||
use super::{strip_workspace_prefix, RelishStorage};
|
||||
|
||||
pub struct LocalFs {
|
||||
root: PathBuf,
|
||||
}
|
||||
|
||||
impl LocalFs {
|
||||
/// Atetmpts to create local FS relish storage, also creates the directory provided, if not exists.
|
||||
pub fn new(root: PathBuf) -> anyhow::Result<Self> {
|
||||
if !root.exists() {
|
||||
std::fs::create_dir_all(&root).with_context(|| {
|
||||
format!(
|
||||
"Failed to create all directories in the given root path {}",
|
||||
root.display(),
|
||||
)
|
||||
})?;
|
||||
}
|
||||
Ok(Self { root })
|
||||
}
|
||||
|
||||
fn resolve_in_storage(&self, path: &Path) -> anyhow::Result<PathBuf> {
|
||||
if path.is_relative() {
|
||||
Ok(self.root.join(path))
|
||||
} else if path.starts_with(&self.root) {
|
||||
Ok(path.to_path_buf())
|
||||
} else {
|
||||
bail!(
|
||||
"Path '{}' does not belong to the current storage",
|
||||
path.display()
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl RelishStorage for LocalFs {
|
||||
type RelishStoragePath = PathBuf;
|
||||
|
||||
fn derive_destination(
|
||||
page_server_workdir: &Path,
|
||||
relish_local_path: &Path,
|
||||
) -> anyhow::Result<Self::RelishStoragePath> {
|
||||
Ok(strip_workspace_prefix(page_server_workdir, relish_local_path)?.to_path_buf())
|
||||
}
|
||||
|
||||
async fn list_relishes(&self) -> anyhow::Result<Vec<Self::RelishStoragePath>> {
|
||||
Ok(get_all_files(&self.root).await?.into_iter().collect())
|
||||
}
|
||||
|
||||
async fn download_relish(
|
||||
&self,
|
||||
from: &Self::RelishStoragePath,
|
||||
to: &Path,
|
||||
) -> anyhow::Result<()> {
|
||||
let file_path = self.resolve_in_storage(from)?;
|
||||
if file_path.exists() && file_path.is_file() {
|
||||
create_target_directory(to).await?;
|
||||
tokio::fs::copy(file_path, to).await?;
|
||||
Ok(())
|
||||
} else {
|
||||
bail!(
|
||||
"File '{}' either does not exist or is not a file",
|
||||
file_path.display()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
async fn delete_relish(&self, path: &Self::RelishStoragePath) -> anyhow::Result<()> {
|
||||
let file_path = self.resolve_in_storage(path)?;
|
||||
if file_path.exists() && file_path.is_file() {
|
||||
Ok(tokio::fs::remove_file(file_path).await?)
|
||||
} else {
|
||||
bail!(
|
||||
"File '{}' either does not exist or is not a file",
|
||||
file_path.display()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
async fn upload_relish(&self, from: &Path, to: &Self::RelishStoragePath) -> anyhow::Result<()> {
|
||||
let target_file_path = self.resolve_in_storage(to)?;
|
||||
create_target_directory(&target_file_path).await?;
|
||||
|
||||
tokio::fs::copy(&from, &target_file_path)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to upload relish '{}' to local storage",
|
||||
from.display(),
|
||||
)
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn get_all_files<'a, P>(
|
||||
directory_path: P,
|
||||
) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<PathBuf>>> + Send + Sync + 'a>>
|
||||
where
|
||||
P: AsRef<Path> + Send + Sync + 'a,
|
||||
{
|
||||
Box::pin(async move {
|
||||
let directory_path = directory_path.as_ref();
|
||||
if directory_path.exists() {
|
||||
if directory_path.is_dir() {
|
||||
let mut paths = Vec::new();
|
||||
let mut dir_contents = tokio::fs::read_dir(directory_path).await?;
|
||||
while let Some(dir_entry) = dir_contents.next_entry().await? {
|
||||
let file_type = dir_entry.file_type().await?;
|
||||
let entry_path = dir_entry.path();
|
||||
if file_type.is_symlink() {
|
||||
log::debug!("{:?} us a symlink, skipping", entry_path)
|
||||
} else if file_type.is_dir() {
|
||||
paths.extend(get_all_files(entry_path).await?.into_iter())
|
||||
} else {
|
||||
paths.push(dir_entry.path());
|
||||
}
|
||||
}
|
||||
Ok(paths)
|
||||
} else {
|
||||
bail!("Path '{}' is not a directory", directory_path.display())
|
||||
}
|
||||
} else {
|
||||
Ok(Vec::new())
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn create_target_directory(target_file_path: &Path) -> anyhow::Result<()> {
|
||||
let target_dir = match target_file_path.parent() {
|
||||
Some(parent_dir) => parent_dir,
|
||||
None => bail!(
|
||||
"Relish path '{}' has no parent directory",
|
||||
target_file_path.display()
|
||||
),
|
||||
};
|
||||
if !target_dir.exists() {
|
||||
tokio::fs::create_dir_all(target_dir).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
144
pageserver/src/relish_storage/rust_s3.rs
Normal file
144
pageserver/src/relish_storage/rust_s3.rs
Normal file
@@ -0,0 +1,144 @@
|
||||
//! A wrapper around AWS S3 client library `rust_s3` to be used a relish storage.
|
||||
|
||||
use std::path::Path;
|
||||
|
||||
use anyhow::Context;
|
||||
use s3::{bucket::Bucket, creds::Credentials, region::Region};
|
||||
|
||||
use crate::{relish_storage::strip_workspace_prefix, S3Config};
|
||||
|
||||
use super::RelishStorage;
|
||||
|
||||
const S3_FILE_SEPARATOR: char = '/';
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct S3ObjectKey(String);
|
||||
|
||||
impl S3ObjectKey {
|
||||
fn key(&self) -> &str {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
/// AWS S3 relish storage.
|
||||
pub struct RustS3 {
|
||||
bucket: Bucket,
|
||||
}
|
||||
|
||||
impl RustS3 {
|
||||
/// Creates the relish storage, errors if incorrect AWS S3 configuration provided.
|
||||
pub fn new(aws_config: &S3Config) -> anyhow::Result<Self> {
|
||||
let region = aws_config
|
||||
.bucket_region
|
||||
.parse::<Region>()
|
||||
.context("Failed to parse the s3 region from config")?;
|
||||
let credentials = Credentials::new(
|
||||
aws_config.access_key_id.as_deref(),
|
||||
aws_config.secret_access_key.as_deref(),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.context("Failed to create the s3 credentials")?;
|
||||
Ok(Self {
|
||||
bucket: Bucket::new_with_path_style(
|
||||
aws_config.bucket_name.as_str(),
|
||||
region,
|
||||
credentials,
|
||||
)
|
||||
.context("Failed to create the s3 bucket")?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl RelishStorage for RustS3 {
|
||||
type RelishStoragePath = S3ObjectKey;
|
||||
|
||||
fn derive_destination(
|
||||
page_server_workdir: &Path,
|
||||
relish_local_path: &Path,
|
||||
) -> anyhow::Result<Self::RelishStoragePath> {
|
||||
let relative_path = strip_workspace_prefix(page_server_workdir, relish_local_path)?;
|
||||
let mut key = String::new();
|
||||
for segment in relative_path {
|
||||
key.push(S3_FILE_SEPARATOR);
|
||||
key.push_str(&segment.to_string_lossy());
|
||||
}
|
||||
Ok(S3ObjectKey(key))
|
||||
}
|
||||
|
||||
async fn list_relishes(&self) -> anyhow::Result<Vec<Self::RelishStoragePath>> {
|
||||
let list_response = self
|
||||
.bucket
|
||||
.list(String::new(), None)
|
||||
.await
|
||||
.context("Failed to list s3 objects")?;
|
||||
|
||||
Ok(list_response
|
||||
.into_iter()
|
||||
.flat_map(|response| response.contents)
|
||||
.map(|s3_object| S3ObjectKey(s3_object.key))
|
||||
.collect())
|
||||
}
|
||||
|
||||
async fn download_relish(
|
||||
&self,
|
||||
from: &Self::RelishStoragePath,
|
||||
to: &Path,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut target_file = std::fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.open(to)
|
||||
.with_context(|| format!("Failed to open target s3 destination at {}", to.display()))?;
|
||||
let code = self
|
||||
.bucket
|
||||
.get_object_stream(from.key(), &mut target_file)
|
||||
.await
|
||||
.with_context(|| format!("Failed to download s3 object with key {}", from.key()))?;
|
||||
if code != 200 {
|
||||
Err(anyhow::format_err!(
|
||||
"Received non-200 exit code during downloading object from directory, code: {}",
|
||||
code
|
||||
))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn delete_relish(&self, path: &Self::RelishStoragePath) -> anyhow::Result<()> {
|
||||
let (_, code) = self
|
||||
.bucket
|
||||
.delete_object(path.key())
|
||||
.await
|
||||
.with_context(|| format!("Failed to delete s3 object with key {}", path.key()))?;
|
||||
if code != 200 {
|
||||
Err(anyhow::format_err!(
|
||||
"Received non-200 exit code during deleting object with key '{}', code: {}",
|
||||
path.key(),
|
||||
code
|
||||
))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn upload_relish(&self, from: &Path, to: &Self::RelishStoragePath) -> anyhow::Result<()> {
|
||||
let mut local_file = tokio::fs::OpenOptions::new().read(true).open(from).await?;
|
||||
|
||||
let code = self
|
||||
.bucket
|
||||
.put_object_stream(&mut local_file, to.key())
|
||||
.await
|
||||
.with_context(|| format!("Failed to create s3 object with key {}", to.key()))?;
|
||||
if code != 200 {
|
||||
Err(anyhow::format_err!(
|
||||
"Received non-200 exit code during creating object with key '{}', code: {}",
|
||||
to.key(),
|
||||
code
|
||||
))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
116
pageserver/src/relish_storage/storage_uploader.rs
Normal file
116
pageserver/src/relish_storage/storage_uploader.rs
Normal file
@@ -0,0 +1,116 @@
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
path::{Path, PathBuf},
|
||||
sync::{Arc, Mutex},
|
||||
thread,
|
||||
};
|
||||
|
||||
use zenith_utils::zid::ZTimelineId;
|
||||
|
||||
use crate::{relish_storage::RelishStorage, RelishStorageConfig};
|
||||
|
||||
use super::{local_fs::LocalFs, rust_s3::RustS3};
|
||||
|
||||
pub struct QueueBasedRelishUploader {
|
||||
upload_queue: Arc<Mutex<VecDeque<(ZTimelineId, PathBuf)>>>,
|
||||
}
|
||||
|
||||
impl QueueBasedRelishUploader {
|
||||
pub fn new(
|
||||
config: &RelishStorageConfig,
|
||||
page_server_workdir: &'static Path,
|
||||
) -> anyhow::Result<Self> {
|
||||
let upload_queue = Arc::new(Mutex::new(VecDeque::new()));
|
||||
let _handle = match config {
|
||||
RelishStorageConfig::LocalFs(root) => {
|
||||
let relish_storage = LocalFs::new(root.clone())?;
|
||||
create_upload_thread(
|
||||
Arc::clone(&upload_queue),
|
||||
relish_storage,
|
||||
page_server_workdir,
|
||||
)?
|
||||
}
|
||||
RelishStorageConfig::AwsS3(s3_config) => {
|
||||
let relish_storage = RustS3::new(s3_config)?;
|
||||
create_upload_thread(
|
||||
Arc::clone(&upload_queue),
|
||||
relish_storage,
|
||||
page_server_workdir,
|
||||
)?
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Self { upload_queue })
|
||||
}
|
||||
|
||||
pub fn schedule_upload(&self, timeline_id: ZTimelineId, relish_path: PathBuf) {
|
||||
self.upload_queue
|
||||
.lock()
|
||||
.unwrap()
|
||||
.push_back((timeline_id, relish_path))
|
||||
}
|
||||
}
|
||||
|
||||
fn create_upload_thread<P, S: 'static + RelishStorage<RelishStoragePath = P>>(
|
||||
upload_queue: Arc<Mutex<VecDeque<(ZTimelineId, PathBuf)>>>,
|
||||
relish_storage: S,
|
||||
page_server_workdir: &'static Path,
|
||||
) -> std::io::Result<thread::JoinHandle<()>> {
|
||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()?;
|
||||
thread::Builder::new()
|
||||
.name("Queue based relish uploader".to_string())
|
||||
.spawn(move || loop {
|
||||
runtime.block_on(async {
|
||||
upload_loop_step(&upload_queue, &relish_storage, page_server_workdir).await;
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
async fn upload_loop_step<P, S: 'static + RelishStorage<RelishStoragePath = P>>(
|
||||
upload_queue: &Mutex<VecDeque<(ZTimelineId, PathBuf)>>,
|
||||
relish_storage: &S,
|
||||
page_server_workdir: &Path,
|
||||
) {
|
||||
let mut queue_accessor = upload_queue.lock().unwrap();
|
||||
log::debug!("current upload queue length: {}", queue_accessor.len());
|
||||
let next_upload = queue_accessor.pop_front();
|
||||
drop(queue_accessor);
|
||||
|
||||
let (relish_timeline_id, relish_local_path) = match next_upload {
|
||||
Some(data) => data,
|
||||
None => {
|
||||
// Don't spin and allow others to use the queue.
|
||||
// In future, could be improved to be more clever about delays depending on relish upload stats
|
||||
thread::sleep(std::time::Duration::from_secs(1));
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = upload_relish(relish_storage, page_server_workdir, &relish_local_path).await {
|
||||
log::error!(
|
||||
"Failed to upload relish '{}' for timeline {}, reason: {}",
|
||||
relish_local_path.display(),
|
||||
relish_timeline_id,
|
||||
e
|
||||
);
|
||||
upload_queue
|
||||
.lock()
|
||||
.unwrap()
|
||||
.push_back((relish_timeline_id, relish_local_path))
|
||||
} else {
|
||||
log::debug!("Relish successfully uploaded");
|
||||
}
|
||||
}
|
||||
|
||||
async fn upload_relish<P, S: RelishStorage<RelishStoragePath = P>>(
|
||||
relish_storage: &S,
|
||||
page_server_workdir: &Path,
|
||||
relish_local_path: &Path,
|
||||
) -> anyhow::Result<()> {
|
||||
let destination = S::derive_destination(page_server_workdir, relish_local_path)?;
|
||||
relish_storage
|
||||
.upload_relish(relish_local_path, &destination)
|
||||
.await
|
||||
}
|
||||
@@ -260,7 +260,6 @@ mod tests {
|
||||
fs::create_dir_all(&repo_dir.join("timelines"))?;
|
||||
|
||||
let conf = PageServerConf::dummy_conf(repo_dir);
|
||||
|
||||
// Make a static copy of the config. This can never be free'd, but that's
|
||||
// OK in a test.
|
||||
let conf: &'static PageServerConf = Box::leak(Box::new(conf));
|
||||
|
||||
Reference in New Issue
Block a user