diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index a1697f0f4f..f117ec1fdf 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -20,8 +20,12 @@ use daemonize::Daemonize; use pageserver::{ branches, - defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR}, - http, page_service, tenant_mgr, PageServerConf, RelishStorageConfig, S3Config, LOG_FILE_NAME, + defaults::{ + DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR, + DEFAULT_RELISH_STORAGE_MAX_CONCURRENT_SYNC_LIMITS, + }, + http, page_service, tenant_mgr, PageServerConf, RelishStorageConfig, RelishStorageKind, + S3Config, LOG_FILE_NAME, }; use zenith_utils::http::endpoint; @@ -41,6 +45,7 @@ struct CfgFileParams { auth_type: Option, // see https://github.com/alexcrichton/toml-rs/blob/6c162e6562c3e432bf04c82a3d1d789d80761a86/examples/enum_external.rs for enum deserialisation examples relish_storage: Option, + relish_storage_max_concurrent_sync: Option, } #[derive(Serialize, Deserialize, Clone)] @@ -91,6 +96,7 @@ impl CfgFileParams { auth_validation_public_key_path: get_arg("auth-validation-public-key-path"), auth_type: get_arg("auth-type"), relish_storage, + relish_storage_max_concurrent_sync: get_arg("relish-storage-max-concurrent-sync"), } } @@ -110,6 +116,9 @@ impl CfgFileParams { .or(other.auth_validation_public_key_path), auth_type: self.auth_type.or(other.auth_type), relish_storage: self.relish_storage.or(other.relish_storage), + relish_storage_max_concurrent_sync: self + .relish_storage_max_concurrent_sync + .or(other.relish_storage_max_concurrent_sync), } } @@ -178,25 +187,34 @@ impl CfgFileParams { ); } - let relish_storage_config = - self.relish_storage - .as_ref() - .map(|storage_params| match storage_params.clone() { - RelishStorage::Local { local_path } => { - RelishStorageConfig::LocalFs(PathBuf::from(local_path)) - } - RelishStorage::AwsS3 { - bucket_name, - bucket_region, - access_key_id, - secret_access_key, - } => RelishStorageConfig::AwsS3(S3Config { - bucket_name, - bucket_region, - access_key_id, - secret_access_key, - }), - }); + let max_concurrent_sync = match self.relish_storage_max_concurrent_sync.as_deref() { + Some(relish_storage_max_concurrent_sync) => { + relish_storage_max_concurrent_sync.parse()? + } + None => DEFAULT_RELISH_STORAGE_MAX_CONCURRENT_SYNC_LIMITS, + }; + let relish_storage_config = self.relish_storage.as_ref().map(|storage_params| { + let storage = match storage_params.clone() { + RelishStorage::Local { local_path } => { + RelishStorageKind::LocalFs(PathBuf::from(local_path)) + } + RelishStorage::AwsS3 { + bucket_name, + bucket_region, + access_key_id, + secret_access_key, + } => RelishStorageKind::AwsS3(S3Config { + bucket_name, + bucket_region, + access_key_id, + secret_access_key, + }), + }; + RelishStorageConfig { + max_concurrent_sync, + storage, + } + }); Ok(PageServerConf { daemonize: false, @@ -345,6 +363,12 @@ fn main() -> Result<()> { .takes_value(true) .help("Credentials to access the AWS S3 bucket"), ) + .arg( + Arg::with_name("relish-storage-max-concurrent-sync") + .long("relish-storage-max-concurrent-sync") + .takes_value(true) + .help("Maximum allowed concurrent synchronisations with storage"), + ) .get_matches(); let workdir = Path::new(arg_matches.value_of("workdir").unwrap_or(".zenith")); diff --git a/pageserver/src/branches.rs b/pageserver/src/branches.rs index cc1ca464ad..23af734060 100644 --- a/pageserver/src/branches.rs +++ b/pageserver/src/branches.rs @@ -155,7 +155,7 @@ pub fn create_repo( // Load data into pageserver // TODO To implement zenith import we need to // move data loading out of create_repo() - bootstrap_timeline(conf, tenantid, tli, &*repo)?; + bootstrap_timeline(conf, tenantid, tli, repo.as_ref())?; Ok(repo) } @@ -221,7 +221,11 @@ fn bootstrap_timeline( // Import the contents of the data directory at the initial checkpoint // LSN, and any WAL after that. let timeline = repo.create_empty_timeline(tli)?; - restore_local_repo::import_timeline_from_postgres_datadir(&pgdata_path, &*timeline, lsn)?; + restore_local_repo::import_timeline_from_postgres_datadir( + &pgdata_path, + timeline.as_ref(), + lsn, + )?; timeline.checkpoint()?; println!( diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 2da82c5bde..a45d2e249e 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -13,7 +13,7 @@ pub mod http; pub mod layered_repository; pub mod page_service; pub mod relish; -mod relish_storage; +pub mod relish_storage; pub mod repository; pub mod restore_local_repo; pub mod tenant_mgr; @@ -40,6 +40,7 @@ pub mod defaults { pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100); pub const DEFAULT_SUPERUSER: &str = "zenith_admin"; + pub const DEFAULT_RELISH_STORAGE_MAX_CONCURRENT_SYNC_LIMITS: usize = 100; } lazy_static! { @@ -168,18 +169,37 @@ impl PageServerConf { /// External relish storage configuration, enough for creating a client for that storage. #[derive(Debug, Clone)] -pub enum RelishStorageConfig { - /// Root folder to place all stored relish data into. +pub struct RelishStorageConfig { + /// Limits the number of concurrent sync operations between pageserver and relish storage. + pub max_concurrent_sync: usize, + /// The storage connection configuration. + pub storage: RelishStorageKind, +} + +/// A kind of a relish storage to connect to, with its connection configuration. +#[derive(Debug, Clone)] +pub enum RelishStorageKind { + /// Storage based on local file system. + /// Specify a root folder to place all stored relish data into. LocalFs(PathBuf), + /// AWS S3 based storage, storing all relishes into the root + /// of the S3 bucket from the config. AwsS3(S3Config), } /// AWS S3 bucket coordinates and access credentials to manage the bucket contents (read and write). #[derive(Clone)] pub struct S3Config { + /// Name of the bucket to connect to. pub bucket_name: String, + /// The region where the bucket is located at. pub bucket_region: String, + /// "Login" to use when connecting to bucket. + /// Can be empty for cases like AWS k8s IAM + /// where we can allow certain pods to connect + /// to the bucket directly without any credentials. pub access_key_id: Option, + /// "Password" to use when connecting to bucket. pub secret_access_key: Option, } diff --git a/pageserver/src/relish_storage/storage_uploader.rs b/pageserver/src/relish_storage/storage_uploader.rs index 2d0889173d..d597cc04ac 100644 --- a/pageserver/src/relish_storage/storage_uploader.rs +++ b/pageserver/src/relish_storage/storage_uploader.rs @@ -7,7 +7,7 @@ use std::{ use zenith_utils::zid::ZTimelineId; -use crate::{relish_storage::RelishStorage, RelishStorageConfig}; +use crate::{relish_storage::RelishStorage, RelishStorageConfig, RelishStorageKind}; use super::{local_fs::LocalFs, rust_s3::RustS3}; @@ -21,8 +21,8 @@ impl QueueBasedRelishUploader { page_server_workdir: &'static Path, ) -> anyhow::Result { let upload_queue = Arc::new(Mutex::new(VecDeque::new())); - let _handle = match config { - RelishStorageConfig::LocalFs(root) => { + let _handle = match &config.storage { + RelishStorageKind::LocalFs(root) => { let relish_storage = LocalFs::new(root.clone())?; create_upload_thread( Arc::clone(&upload_queue), @@ -30,7 +30,7 @@ impl QueueBasedRelishUploader { page_server_workdir, )? } - RelishStorageConfig::AwsS3(s3_config) => { + RelishStorageKind::AwsS3(s3_config) => { let relish_storage = RustS3::new(s3_config)?; create_upload_thread( Arc::clone(&upload_queue),