Revert "WIP cleanup unused RemoteStorage fields + half-baked copy_file impl"

This reverts commit 7553bbe3f5.
This commit is contained in:
Christian Schwarz
2023-10-26 15:44:26 +00:00
parent 7553bbe3f5
commit bb8531d920
11 changed files with 60 additions and 53 deletions

View File

@@ -78,7 +78,7 @@ use regex::Regex;
use remote_storage::*;
use serde_json;
use std::io::Read;
use std::num::NonZeroUsize;
use std::num::{NonZeroU32, NonZeroUsize};
use std::path::Path;
use std::str;
use tar::Archive;
@@ -281,6 +281,8 @@ pub fn init_remote_storage(remote_ext_config: &str) -> anyhow::Result<GenericRem
max_keys_per_list_response: None,
};
let config = RemoteStorageConfig {
max_concurrent_syncs: NonZeroUsize::new(100).expect("100 != 0"),
max_sync_errors: NonZeroU32::new(100).expect("100 != 0"),
storage: RemoteStorageKind::AwsS3(config),
};
GenericRemoteStorage::from_config(&config)

View File

@@ -353,8 +353,4 @@ impl RemoteStorage for AzureBlobStorage {
}
Ok(())
}
async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()> {
unimplemented!()
}
}

View File

@@ -12,7 +12,13 @@ mod local_fs;
mod s3_bucket;
mod simulate_failures;
use std::{collections::HashMap, fmt::Debug, num::NonZeroUsize, pin::Pin, sync::Arc};
use std::{
collections::HashMap,
fmt::Debug,
num::{NonZeroU32, NonZeroUsize},
pin::Pin,
sync::Arc,
};
use anyhow::{bail, Context};
use camino::{Utf8Path, Utf8PathBuf};
@@ -28,6 +34,12 @@ pub use self::{
};
use s3_bucket::RequestKind;
/// How many different timelines can be processed simultaneously when synchronizing layers with the remote storage.
/// During regular work, pageserver produces one layer file per timeline checkpoint, with bursts of concurrency
/// during start (where local and remote timelines are compared and initial sync tasks are scheduled) and timeline attach.
/// Both cases may trigger timeline download, that might download a lot of layers. This concurrency is limited by the clients internally, if needed.
pub const DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS: usize = 50;
pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10;
/// Currently, sync happens with AWS S3, that has two limits on requests per second:
/// ~200 RPS for IAM services
/// <https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html>
@@ -100,7 +112,7 @@ impl RemotePath {
self.0.file_name()
}
pub fn join<P: AsRef<Utf8Path>>(&self, segment: P) -> Self {
pub fn join(&self, segment: &Utf8Path) -> Self {
Self(self.0.join(segment))
}
@@ -171,8 +183,6 @@ pub trait RemoteStorage: Send + Sync + 'static {
async fn delete(&self, path: &RemotePath) -> anyhow::Result<()>;
async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()>;
async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()>;
}
pub struct Download {
@@ -318,15 +328,6 @@ impl GenericRemoteStorage {
Self::Unreliable(s) => s.delete_objects(paths).await,
}
}
pub async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()> {
match self {
Self::LocalFs(s) => s.copy_object(src, dst).await,
Self::AwsS3(s) => s.copy_object(src, dst).await,
Self::AzureBlob(s) => s.copy_object(src, dst).await,
Self::Unreliable(s) => s.copy_object(src, dst).await,
}
}
}
impl GenericRemoteStorage {
@@ -393,6 +394,10 @@ pub struct StorageMetadata(HashMap<String, String>);
/// External backup storage configuration, enough for creating a client for that storage.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RemoteStorageConfig {
/// Max allowed number of concurrent sync operations between the API user and the remote storage.
pub max_concurrent_syncs: NonZeroUsize,
/// Max allowed errors before the sync task is considered failed and evicted.
pub max_sync_errors: NonZeroU32,
/// The storage connection configuration.
pub storage: RemoteStorageKind,
}
@@ -488,6 +493,18 @@ impl RemoteStorageConfig {
let use_azure = container_name.is_some() && container_region.is_some();
let max_concurrent_syncs = NonZeroUsize::new(
parse_optional_integer("max_concurrent_syncs", toml)?
.unwrap_or(DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS),
)
.context("Failed to parse 'max_concurrent_syncs' as a positive integer")?;
let max_sync_errors = NonZeroU32::new(
parse_optional_integer("max_sync_errors", toml)?
.unwrap_or(DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS),
)
.context("Failed to parse 'max_sync_errors' as a positive integer")?;
let default_concurrency_limit = if use_azure {
DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT
} else {
@@ -569,7 +586,11 @@ impl RemoteStorageConfig {
}
};
Ok(Some(RemoteStorageConfig { storage }))
Ok(Some(RemoteStorageConfig {
max_concurrent_syncs,
max_sync_errors,
storage,
}))
}
}

View File

@@ -371,27 +371,6 @@ impl RemoteStorage for LocalFs {
}
Ok(())
}
async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()> {
let src_path = src.with_base(&self.storage_root);
let dst_path = dst.with_base(&self.storage_root);
// If the destination file already exists, we need to delete it first.
if dst_path.exists() {
fs::remove_file(&dst_path).await?;
}
// Copy the file.
fs::copy(&src_path, &dst_path).await?;
// Copy the metadata.
let metadata_path = storage_metadata_path(&src_path);
if metadata_path.exists() {
fs::copy(&metadata_path, storage_metadata_path(&dst_path)).await?;
}
Ok(())
}
}
fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {

View File

@@ -221,8 +221,6 @@ impl S3Bucket {
)),
}
}
}
pin_project_lite::pin_project! {
@@ -548,11 +546,6 @@ impl RemoteStorage for S3Bucket {
let paths = std::array::from_ref(path);
self.delete_objects(paths).await
}
async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()> {
unimplemented!()
}
}
/// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`].
@@ -635,6 +628,4 @@ mod tests {
}
}
}
}

View File

@@ -149,8 +149,4 @@ impl RemoteStorage for UnreliableWrapper {
}
Ok(())
}
async fn copy_object(&self, src: &RemotePath, dst: &RemotePath) -> anyhow::Result<()> {
unimplemented!()
}
}

View File

@@ -469,6 +469,8 @@ fn create_azure_client(
let random = rand::thread_rng().gen::<u32>();
let remote_storage_config = RemoteStorageConfig {
max_concurrent_syncs: NonZeroUsize::new(100).unwrap(),
max_sync_errors: NonZeroU32::new(5).unwrap(),
storage: RemoteStorageKind::AzureContainer(AzureConfig {
container_name: remote_storage_azure_container,
container_region: remote_storage_azure_region,

View File

@@ -396,6 +396,8 @@ fn create_s3_client(
let random = rand::thread_rng().gen::<u32>();
let remote_storage_config = RemoteStorageConfig {
max_concurrent_syncs: NonZeroUsize::new(100).unwrap(),
max_sync_errors: NonZeroU32::new(5).unwrap(),
storage: RemoteStorageKind::AwsS3(S3Config {
bucket_name: remote_storage_s3_bucket,
bucket_region: remote_storage_s3_region,

View File

@@ -1320,6 +1320,12 @@ broker_endpoint = '{broker_endpoint}'
assert_eq!(
parsed_remote_storage_config,
RemoteStorageConfig {
max_concurrent_syncs: NonZeroUsize::new(
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS
)
.unwrap(),
max_sync_errors: NonZeroU32::new(remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS)
.unwrap(),
storage: RemoteStorageKind::LocalFs(local_storage_path.clone()),
},
"Remote storage config should correctly parse the local FS config and fill other storage defaults"

View File

@@ -892,6 +892,14 @@ mod test {
std::fs::create_dir_all(remote_fs_dir)?;
let remote_fs_dir = harness.conf.workdir.join("remote_fs").canonicalize_utf8()?;
let storage_config = RemoteStorageConfig {
max_concurrent_syncs: std::num::NonZeroUsize::new(
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS,
)
.unwrap(),
max_sync_errors: std::num::NonZeroU32::new(
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS,
)
.unwrap(),
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
};
let storage = GenericRemoteStorage::from_config(&storage_config).unwrap();

View File

@@ -3728,6 +3728,10 @@ pub(crate) mod harness {
let remote_fs_dir = conf.workdir.join("localfs");
std::fs::create_dir_all(&remote_fs_dir).unwrap();
let config = RemoteStorageConfig {
// TODO: why not remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS,
max_concurrent_syncs: std::num::NonZeroUsize::new(2_000_000).unwrap(),
// TODO: why not remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS,
max_sync_errors: std::num::NonZeroU32::new(3_000_000).unwrap(),
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
};
let remote_storage = GenericRemoteStorage::from_config(&config).unwrap();