diff --git a/compute_tools/src/extension_server.rs b/compute_tools/src/extension_server.rs index 2278509c1f..3d7ed8c360 100644 --- a/compute_tools/src/extension_server.rs +++ b/compute_tools/src/extension_server.rs @@ -78,7 +78,7 @@ use regex::Regex; use remote_storage::*; use serde_json; use std::io::Read; -use std::num::NonZeroUsize; +use std::num::{NonZeroU32, NonZeroUsize}; use std::path::Path; use std::str; use tar::Archive; @@ -281,6 +281,8 @@ pub fn init_remote_storage(remote_ext_config: &str) -> anyhow::Result anyhow::Result<()> { - unimplemented!() - } } diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index f87747cc7b..435364d83a 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -12,7 +12,13 @@ mod local_fs; mod s3_bucket; mod simulate_failures; -use std::{collections::HashMap, fmt::Debug, num::NonZeroUsize, pin::Pin, sync::Arc}; +use std::{ + collections::HashMap, + fmt::Debug, + num::{NonZeroU32, NonZeroUsize}, + pin::Pin, + sync::Arc, +}; use anyhow::{bail, Context}; use camino::{Utf8Path, Utf8PathBuf}; @@ -28,6 +34,12 @@ pub use self::{ }; use s3_bucket::RequestKind; +/// How many different timelines can be processed simultaneously when synchronizing layers with the remote storage. +/// During regular work, pageserver produces one layer file per timeline checkpoint, with bursts of concurrency +/// during start (where local and remote timelines are compared and initial sync tasks are scheduled) and timeline attach. +/// Both cases may trigger timeline download, that might download a lot of layers. This concurrency is limited by the clients internally, if needed. +pub const DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS: usize = 50; +pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10; /// Currently, sync happens with AWS S3, that has two limits on requests per second: /// ~200 RPS for IAM services /// @@ -100,7 +112,7 @@ impl RemotePath { self.0.file_name() } - pub fn join>(&self, segment: P) -> Self { + pub fn join(&self, segment: &Utf8Path) -> Self { Self(self.0.join(segment)) } @@ -171,8 +183,6 @@ pub trait RemoteStorage: Send + Sync + 'static { async fn delete(&self, path: &RemotePath) -> anyhow::Result<()>; async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()>; - - async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()>; } pub struct Download { @@ -318,15 +328,6 @@ impl GenericRemoteStorage { Self::Unreliable(s) => s.delete_objects(paths).await, } } - - pub async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()> { - match self { - Self::LocalFs(s) => s.copy_object(src, dst).await, - Self::AwsS3(s) => s.copy_object(src, dst).await, - Self::AzureBlob(s) => s.copy_object(src, dst).await, - Self::Unreliable(s) => s.copy_object(src, dst).await, - } - } } impl GenericRemoteStorage { @@ -393,6 +394,10 @@ pub struct StorageMetadata(HashMap); /// External backup storage configuration, enough for creating a client for that storage. #[derive(Debug, Clone, PartialEq, Eq)] pub struct RemoteStorageConfig { + /// Max allowed number of concurrent sync operations between the API user and the remote storage. + pub max_concurrent_syncs: NonZeroUsize, + /// Max allowed errors before the sync task is considered failed and evicted. + pub max_sync_errors: NonZeroU32, /// The storage connection configuration. pub storage: RemoteStorageKind, } @@ -488,6 +493,18 @@ impl RemoteStorageConfig { let use_azure = container_name.is_some() && container_region.is_some(); + let max_concurrent_syncs = NonZeroUsize::new( + parse_optional_integer("max_concurrent_syncs", toml)? + .unwrap_or(DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS), + ) + .context("Failed to parse 'max_concurrent_syncs' as a positive integer")?; + + let max_sync_errors = NonZeroU32::new( + parse_optional_integer("max_sync_errors", toml)? + .unwrap_or(DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS), + ) + .context("Failed to parse 'max_sync_errors' as a positive integer")?; + let default_concurrency_limit = if use_azure { DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT } else { @@ -569,7 +586,11 @@ impl RemoteStorageConfig { } }; - Ok(Some(RemoteStorageConfig { storage })) + Ok(Some(RemoteStorageConfig { + max_concurrent_syncs, + max_sync_errors, + storage, + })) } } diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index f224b78fcd..3d32b6b631 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -371,27 +371,6 @@ impl RemoteStorage for LocalFs { } Ok(()) } - - async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()> { - let src_path = src.with_base(&self.storage_root); - let dst_path = dst.with_base(&self.storage_root); - - // If the destination file already exists, we need to delete it first. - if dst_path.exists() { - fs::remove_file(&dst_path).await?; - } - - // Copy the file. - fs::copy(&src_path, &dst_path).await?; - - // Copy the metadata. - let metadata_path = storage_metadata_path(&src_path); - if metadata_path.exists() { - fs::copy(&metadata_path, storage_metadata_path(&dst_path)).await?; - } - - Ok(()) - } } fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf { diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index d78b6f46cf..fc94281666 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -221,8 +221,6 @@ impl S3Bucket { )), } } - - } pin_project_lite::pin_project! { @@ -548,11 +546,6 @@ impl RemoteStorage for S3Bucket { let paths = std::array::from_ref(path); self.delete_objects(paths).await } - - async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()> { - unimplemented!() - } - } /// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`]. @@ -635,6 +628,4 @@ mod tests { } } } - - } diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index 7e60aa5f34..6d6a5c1d24 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -149,8 +149,4 @@ impl RemoteStorage for UnreliableWrapper { } Ok(()) } - - async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()> { - unimplemented!() - } } diff --git a/libs/remote_storage/tests/test_real_azure.rs b/libs/remote_storage/tests/test_real_azure.rs index 5649fc08d1..0338270aaf 100644 --- a/libs/remote_storage/tests/test_real_azure.rs +++ b/libs/remote_storage/tests/test_real_azure.rs @@ -469,6 +469,8 @@ fn create_azure_client( let random = rand::thread_rng().gen::(); let remote_storage_config = RemoteStorageConfig { + max_concurrent_syncs: NonZeroUsize::new(100).unwrap(), + max_sync_errors: NonZeroU32::new(5).unwrap(), storage: RemoteStorageKind::AzureContainer(AzureConfig { container_name: remote_storage_azure_container, container_region: remote_storage_azure_region, diff --git a/libs/remote_storage/tests/test_real_s3.rs b/libs/remote_storage/tests/test_real_s3.rs index 6baa0a6a62..7e2aa9f6d7 100644 --- a/libs/remote_storage/tests/test_real_s3.rs +++ b/libs/remote_storage/tests/test_real_s3.rs @@ -396,6 +396,8 @@ fn create_s3_client( let random = rand::thread_rng().gen::(); let remote_storage_config = RemoteStorageConfig { + max_concurrent_syncs: NonZeroUsize::new(100).unwrap(), + max_sync_errors: NonZeroU32::new(5).unwrap(), storage: RemoteStorageKind::AwsS3(S3Config { bucket_name: remote_storage_s3_bucket, bucket_region: remote_storage_s3_region, diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 2b43b015bf..fe62a7299a 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -1320,6 +1320,12 @@ broker_endpoint = '{broker_endpoint}' assert_eq!( parsed_remote_storage_config, RemoteStorageConfig { + max_concurrent_syncs: NonZeroUsize::new( + remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS + ) + .unwrap(), + max_sync_errors: NonZeroU32::new(remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS) + .unwrap(), storage: RemoteStorageKind::LocalFs(local_storage_path.clone()), }, "Remote storage config should correctly parse the local FS config and fill other storage defaults" diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 4bbae2bf36..22efa23f10 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -892,6 +892,14 @@ mod test { std::fs::create_dir_all(remote_fs_dir)?; let remote_fs_dir = harness.conf.workdir.join("remote_fs").canonicalize_utf8()?; let storage_config = RemoteStorageConfig { + max_concurrent_syncs: std::num::NonZeroUsize::new( + remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS, + ) + .unwrap(), + max_sync_errors: std::num::NonZeroU32::new( + remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS, + ) + .unwrap(), storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()), }; let storage = GenericRemoteStorage::from_config(&storage_config).unwrap(); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 27f7230603..f8895c32dc 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3728,6 +3728,10 @@ pub(crate) mod harness { let remote_fs_dir = conf.workdir.join("localfs"); std::fs::create_dir_all(&remote_fs_dir).unwrap(); let config = RemoteStorageConfig { + // TODO: why not remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS, + max_concurrent_syncs: std::num::NonZeroUsize::new(2_000_000).unwrap(), + // TODO: why not remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS, + max_sync_errors: std::num::NonZeroU32::new(3_000_000).unwrap(), storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()), }; let remote_storage = GenericRemoteStorage::from_config(&config).unwrap();