From 7553bbe3f5926a12dbb8090403af91dc5f5f688a Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 26 Oct 2023 15:44:03 +0000 Subject: [PATCH] WIP cleanup unused RemoteStorage fields + half-baked copy_file impl --- compute_tools/src/extension_server.rs | 4 +- libs/remote_storage/src/azure_blob.rs | 4 ++ libs/remote_storage/src/lib.rs | 49 ++++++-------------- libs/remote_storage/src/local_fs.rs | 21 +++++++++ libs/remote_storage/src/s3_bucket.rs | 9 ++++ libs/remote_storage/src/simulate_failures.rs | 4 ++ libs/remote_storage/tests/test_real_azure.rs | 2 - libs/remote_storage/tests/test_real_s3.rs | 2 - pageserver/src/config.rs | 6 --- pageserver/src/deletion_queue.rs | 8 ---- pageserver/src/tenant.rs | 4 -- 11 files changed, 53 insertions(+), 60 deletions(-) diff --git a/compute_tools/src/extension_server.rs b/compute_tools/src/extension_server.rs index 3d7ed8c360..2278509c1f 100644 --- a/compute_tools/src/extension_server.rs +++ b/compute_tools/src/extension_server.rs @@ -78,7 +78,7 @@ use regex::Regex; use remote_storage::*; use serde_json; use std::io::Read; -use std::num::{NonZeroU32, NonZeroUsize}; +use std::num::NonZeroUsize; use std::path::Path; use std::str; use tar::Archive; @@ -281,8 +281,6 @@ pub fn init_remote_storage(remote_ext_config: &str) -> anyhow::Result anyhow::Result<()> { + unimplemented!() + } } diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 435364d83a..f87747cc7b 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -12,13 +12,7 @@ mod local_fs; mod s3_bucket; mod simulate_failures; -use std::{ - collections::HashMap, - fmt::Debug, - num::{NonZeroU32, NonZeroUsize}, - pin::Pin, - sync::Arc, -}; +use std::{collections::HashMap, fmt::Debug, num::NonZeroUsize, pin::Pin, sync::Arc}; use anyhow::{bail, Context}; use camino::{Utf8Path, Utf8PathBuf}; @@ -34,12 +28,6 @@ pub use self::{ }; use s3_bucket::RequestKind; -/// How many different timelines can be processed simultaneously when synchronizing layers with the remote storage. -/// During regular work, pageserver produces one layer file per timeline checkpoint, with bursts of concurrency -/// during start (where local and remote timelines are compared and initial sync tasks are scheduled) and timeline attach. -/// Both cases may trigger timeline download, that might download a lot of layers. This concurrency is limited by the clients internally, if needed. -pub const DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS: usize = 50; -pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10; /// Currently, sync happens with AWS S3, that has two limits on requests per second: /// ~200 RPS for IAM services /// @@ -112,7 +100,7 @@ impl RemotePath { self.0.file_name() } - pub fn join(&self, segment: &Utf8Path) -> Self { + pub fn join>(&self, segment: P) -> Self { Self(self.0.join(segment)) } @@ -183,6 +171,8 @@ pub trait RemoteStorage: Send + Sync + 'static { async fn delete(&self, path: &RemotePath) -> anyhow::Result<()>; async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()>; + + async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()>; } pub struct Download { @@ -328,6 +318,15 @@ impl GenericRemoteStorage { Self::Unreliable(s) => s.delete_objects(paths).await, } } + + pub async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()> { + match self { + Self::LocalFs(s) => s.copy_object(src, dst).await, + Self::AwsS3(s) => s.copy_object(src, dst).await, + Self::AzureBlob(s) => s.copy_object(src, dst).await, + Self::Unreliable(s) => s.copy_object(src, dst).await, + } + } } impl GenericRemoteStorage { @@ -394,10 +393,6 @@ pub struct StorageMetadata(HashMap); /// External backup storage configuration, enough for creating a client for that storage. #[derive(Debug, Clone, PartialEq, Eq)] pub struct RemoteStorageConfig { - /// Max allowed number of concurrent sync operations between the API user and the remote storage. - pub max_concurrent_syncs: NonZeroUsize, - /// Max allowed errors before the sync task is considered failed and evicted. - pub max_sync_errors: NonZeroU32, /// The storage connection configuration. pub storage: RemoteStorageKind, } @@ -493,18 +488,6 @@ impl RemoteStorageConfig { let use_azure = container_name.is_some() && container_region.is_some(); - let max_concurrent_syncs = NonZeroUsize::new( - parse_optional_integer("max_concurrent_syncs", toml)? - .unwrap_or(DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS), - ) - .context("Failed to parse 'max_concurrent_syncs' as a positive integer")?; - - let max_sync_errors = NonZeroU32::new( - parse_optional_integer("max_sync_errors", toml)? - .unwrap_or(DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS), - ) - .context("Failed to parse 'max_sync_errors' as a positive integer")?; - let default_concurrency_limit = if use_azure { DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT } else { @@ -586,11 +569,7 @@ impl RemoteStorageConfig { } }; - Ok(Some(RemoteStorageConfig { - max_concurrent_syncs, - max_sync_errors, - storage, - })) + Ok(Some(RemoteStorageConfig { storage })) } } diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 3d32b6b631..f224b78fcd 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -371,6 +371,27 @@ impl RemoteStorage for LocalFs { } Ok(()) } + + async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()> { + let src_path = src.with_base(&self.storage_root); + let dst_path = dst.with_base(&self.storage_root); + + // If the destination file already exists, we need to delete it first. + if dst_path.exists() { + fs::remove_file(&dst_path).await?; + } + + // Copy the file. + fs::copy(&src_path, &dst_path).await?; + + // Copy the metadata. + let metadata_path = storage_metadata_path(&src_path); + if metadata_path.exists() { + fs::copy(&metadata_path, storage_metadata_path(&dst_path)).await?; + } + + Ok(()) + } } fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf { diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index fc94281666..d78b6f46cf 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -221,6 +221,8 @@ impl S3Bucket { )), } } + + } pin_project_lite::pin_project! { @@ -546,6 +548,11 @@ impl RemoteStorage for S3Bucket { let paths = std::array::from_ref(path); self.delete_objects(paths).await } + + async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()> { + unimplemented!() + } + } /// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`]. @@ -628,4 +635,6 @@ mod tests { } } } + + } diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index 6d6a5c1d24..7e60aa5f34 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -149,4 +149,8 @@ impl RemoteStorage for UnreliableWrapper { } Ok(()) } + + async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()> { + unimplemented!() + } } diff --git a/libs/remote_storage/tests/test_real_azure.rs b/libs/remote_storage/tests/test_real_azure.rs index 0338270aaf..5649fc08d1 100644 --- a/libs/remote_storage/tests/test_real_azure.rs +++ b/libs/remote_storage/tests/test_real_azure.rs @@ -469,8 +469,6 @@ fn create_azure_client( let random = rand::thread_rng().gen::(); let remote_storage_config = RemoteStorageConfig { - max_concurrent_syncs: NonZeroUsize::new(100).unwrap(), - max_sync_errors: NonZeroU32::new(5).unwrap(), storage: RemoteStorageKind::AzureContainer(AzureConfig { container_name: remote_storage_azure_container, container_region: remote_storage_azure_region, diff --git a/libs/remote_storage/tests/test_real_s3.rs b/libs/remote_storage/tests/test_real_s3.rs index 7e2aa9f6d7..6baa0a6a62 100644 --- a/libs/remote_storage/tests/test_real_s3.rs +++ b/libs/remote_storage/tests/test_real_s3.rs @@ -396,8 +396,6 @@ fn create_s3_client( let random = rand::thread_rng().gen::(); let remote_storage_config = RemoteStorageConfig { - max_concurrent_syncs: NonZeroUsize::new(100).unwrap(), - max_sync_errors: NonZeroU32::new(5).unwrap(), storage: RemoteStorageKind::AwsS3(S3Config { bucket_name: remote_storage_s3_bucket, bucket_region: remote_storage_s3_region, diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index fe62a7299a..2b43b015bf 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -1320,12 +1320,6 @@ broker_endpoint = '{broker_endpoint}' assert_eq!( parsed_remote_storage_config, RemoteStorageConfig { - max_concurrent_syncs: NonZeroUsize::new( - remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS - ) - .unwrap(), - max_sync_errors: NonZeroU32::new(remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS) - .unwrap(), storage: RemoteStorageKind::LocalFs(local_storage_path.clone()), }, "Remote storage config should correctly parse the local FS config and fill other storage defaults" diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 22efa23f10..4bbae2bf36 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -892,14 +892,6 @@ mod test { std::fs::create_dir_all(remote_fs_dir)?; let remote_fs_dir = harness.conf.workdir.join("remote_fs").canonicalize_utf8()?; let storage_config = RemoteStorageConfig { - max_concurrent_syncs: std::num::NonZeroUsize::new( - remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS, - ) - .unwrap(), - max_sync_errors: std::num::NonZeroU32::new( - remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS, - ) - .unwrap(), storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()), }; let storage = GenericRemoteStorage::from_config(&storage_config).unwrap(); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index f8895c32dc..27f7230603 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3728,10 +3728,6 @@ pub(crate) mod harness { let remote_fs_dir = conf.workdir.join("localfs"); std::fs::create_dir_all(&remote_fs_dir).unwrap(); let config = RemoteStorageConfig { - // TODO: why not remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS, - max_concurrent_syncs: std::num::NonZeroUsize::new(2_000_000).unwrap(), - // TODO: why not remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS, - max_sync_errors: std::num::NonZeroU32::new(3_000_000).unwrap(), storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()), }; let remote_storage = GenericRemoteStorage::from_config(&config).unwrap();