diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 8a10e098a1..55db91dc31 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -14,8 +14,10 @@ use std::{ ffi::OsStr, fmt::Debug, num::{NonZeroU32, NonZeroUsize}, + ops::Deref, path::{Path, PathBuf}, pin::Pin, + sync::Arc, }; use anyhow::{bail, Context}; @@ -24,10 +26,7 @@ use tokio::io; use toml_edit::Item; use tracing::info; -pub use self::{ - local_fs::LocalFs, - s3_bucket::{S3Bucket, S3ObjectKey}, -}; +pub use self::{local_fs::LocalFs, s3_bucket::S3Bucket}; /// How many different timelines can be processed simultaneously when synchronizing layers with the remote storage. /// During regular work, pageserver produces one layer file per timeline checkpoint, with bursts of concurrency @@ -42,22 +41,62 @@ pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10; /// https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/ pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100; +const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/'; + +#[derive(Clone, PartialEq, Eq)] +pub struct RemoteObjectId(String); + +impl From for String { + fn from(id: RemoteObjectId) -> Self { + id.0 + } +} + +/// +/// A key that refers to an object in remote storage. It works much like a Path, +/// but it's a separate datatype so that you don't accidentally mix local paths +/// and remote keys. +/// +impl RemoteObjectId { + // Needed to retrieve last component for RemoteObjectId. + // In other words a file name + /// Turn a/b/c or a/b/c/ into c + pub fn object_name(&self) -> Option<&str> { + // corner case, char::to_string is not const, thats why this is more verbose than it needs to be + // see https://github.com/rust-lang/rust/issues/88674 + if self.0.len() == 1 && self.0.chars().next().unwrap() == REMOTE_STORAGE_PREFIX_SEPARATOR { + return None; + } + + if self.0.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) { + self.0.rsplit(REMOTE_STORAGE_PREFIX_SEPARATOR).nth(1) + } else { + self.0 + .rsplit_once(REMOTE_STORAGE_PREFIX_SEPARATOR) + .map(|(_, last)| last) + } + } +} + +impl Debug for RemoteObjectId { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + self.0.fmt(fmt) + } +} + /// Storage (potentially remote) API to manage its state. /// This storage tries to be unaware of any layered repository context, /// providing basic CRUD operations for storage files. #[async_trait::async_trait] -pub trait RemoteStorage: Send + Sync { - /// A way to uniquely reference a file in the remote storage. - type RemoteObjectId; - +pub trait RemoteStorage: Send + Sync + 'static { /// Attempts to derive the storage path out of the local path, if the latter is correct. - fn remote_object_id(&self, local_path: &Path) -> anyhow::Result; + fn remote_object_id(&self, local_path: &Path) -> anyhow::Result; /// Gets the download path of the given storage file. - fn local_path(&self, remote_object_id: &Self::RemoteObjectId) -> anyhow::Result; + fn local_path(&self, remote_object_id: &RemoteObjectId) -> anyhow::Result; /// Lists all items the storage has right now. - async fn list(&self) -> anyhow::Result>; + async fn list(&self) -> anyhow::Result>; /// Lists all top level subdirectories for a given prefix /// Note: here we assume that if the prefix is passed it was obtained via remote_object_id @@ -65,34 +104,39 @@ pub trait RemoteStorage: Send + Sync { /// so this method doesnt need to. async fn list_prefixes( &self, - prefix: Option<&Self::RemoteObjectId>, - ) -> anyhow::Result>; + prefix: Option<&RemoteObjectId>, + ) -> anyhow::Result>; /// Streams the local file contents into remote into the remote storage entry. async fn upload( &self, - from: impl io::AsyncRead + Unpin + Send + Sync + 'static, + from: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>, // S3 PUT request requires the content length to be specified, // otherwise it starts to fail with the concurrent connection count increasing. from_size_bytes: usize, - to: &Self::RemoteObjectId, + to: &RemoteObjectId, metadata: Option, ) -> anyhow::Result<()>; /// Streams the remote storage entry contents into the buffered writer given, returns the filled writer. /// Returns the metadata, if any was stored with the file previously. - async fn download(&self, from: &Self::RemoteObjectId) -> Result; + async fn download(&self, from: &RemoteObjectId) -> Result; /// Streams a given byte range of the remote storage entry contents into the buffered writer given, returns the filled writer. /// Returns the metadata, if any was stored with the file previously. async fn download_byte_range( &self, - from: &Self::RemoteObjectId, + from: &RemoteObjectId, start_inclusive: u64, end_exclusive: Option, ) -> Result; - async fn delete(&self, path: &Self::RemoteObjectId) -> anyhow::Result<()>; + async fn delete(&self, path: &RemoteObjectId) -> anyhow::Result<()>; + + /// Downcast to LocalFs implementation. For tests. + fn as_local(&self) -> Option<&LocalFs> { + None + } } pub struct Download { @@ -135,34 +179,37 @@ impl std::error::Error for DownloadError {} /// Every storage, currently supported. /// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics. -pub enum GenericRemoteStorage { - Local(LocalFs), - S3(S3Bucket), +#[derive(Clone)] +pub struct GenericRemoteStorage(Arc); + +impl Deref for GenericRemoteStorage { + type Target = dyn RemoteStorage; + + fn deref(&self) -> &Self::Target { + self.0.as_ref() + } } impl GenericRemoteStorage { - pub fn new( + pub fn new(storage: impl RemoteStorage) -> Self { + Self(Arc::new(storage)) + } + + pub fn from_config( working_directory: PathBuf, storage_config: &RemoteStorageConfig, - ) -> anyhow::Result { - match &storage_config.storage { + ) -> anyhow::Result { + Ok(match &storage_config.storage { RemoteStorageKind::LocalFs(root) => { info!("Using fs root '{}' as a remote storage", root.display()); - LocalFs::new(root.clone(), working_directory).map(GenericRemoteStorage::Local) + GenericRemoteStorage::new(LocalFs::new(root.clone(), working_directory)?) } RemoteStorageKind::AwsS3(s3_config) => { info!("Using s3 bucket '{}' in region '{}' as a remote storage, prefix in bucket: '{:?}', bucket endpoint: '{:?}'", - s3_config.bucket_name, s3_config.bucket_region, s3_config.prefix_in_bucket, s3_config.endpoint); - S3Bucket::new(s3_config, working_directory).map(GenericRemoteStorage::S3) + s3_config.bucket_name, s3_config.bucket_region, s3_config.prefix_in_bucket, s3_config.endpoint); + GenericRemoteStorage::new(S3Bucket::new(s3_config, working_directory)?) } - } - } - - pub fn as_local(&self) -> Option<&LocalFs> { - match self { - Self::Local(local_fs) => Some(local_fs), - _ => None, - } + }) } /// Takes storage object contents and its size and uploads to remote storage, @@ -172,47 +219,26 @@ impl GenericRemoteStorage { /// this path is used for the remote object id conversion only. pub async fn upload_storage_object( &self, - from: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static, + from: Box, from_size_bytes: usize, from_path: &Path, ) -> anyhow::Result<()> { - async fn do_upload_storage_object( - storage: &S, - from: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static, - from_size_bytes: usize, - from_path: &Path, - ) -> anyhow::Result<()> - where - P: std::fmt::Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, - { - let target_storage_path = storage.remote_object_id(from_path).with_context(|| { + let target_storage_path = self.remote_object_id(from_path).with_context(|| { + format!( + "Failed to get the storage path for source local path '{}'", + from_path.display() + ) + })?; + + self.upload(from, from_size_bytes, &target_storage_path, None) + .await + .with_context(|| { format!( - "Failed to get the storage path for source local path '{}'", - from_path.display() + "Failed to upload from '{}' to storage path '{:?}'", + from_path.display(), + target_storage_path ) - })?; - - storage - .upload(from, from_size_bytes, &target_storage_path, None) - .await - .with_context(|| { - format!( - "Failed to upload from '{}' to storage path '{:?}'", - from_path.display(), - target_storage_path - ) - }) - } - - match self { - GenericRemoteStorage::Local(storage) => { - do_upload_storage_object(storage, from, from_size_bytes, from_path).await - } - GenericRemoteStorage::S3(storage) => { - do_upload_storage_object(storage, from, from_size_bytes, from_path).await - } - } + }) } /// Downloads the storage object into the `to_path` provided. @@ -222,42 +248,22 @@ impl GenericRemoteStorage { byte_range: Option<(u64, Option)>, to_path: &Path, ) -> Result { - async fn do_download_storage_object( - storage: &S, - byte_range: Option<(u64, Option)>, - to_path: &Path, - ) -> Result - where - P: std::fmt::Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, - { - let remote_object_path = storage - .remote_object_id(to_path) - .with_context(|| { - format!( - "Failed to get the storage path for target local path '{}'", - to_path.display() - ) - }) - .map_err(DownloadError::BadInput)?; + let remote_object_path = self + .remote_object_id(to_path) + .with_context(|| { + format!( + "Failed to get the storage path for target local path '{}'", + to_path.display() + ) + }) + .map_err(DownloadError::BadInput)?; - match byte_range { - Some((start, end)) => { - storage - .download_byte_range(&remote_object_path, start, end) - .await - } - None => storage.download(&remote_object_path).await, - } - } - - match self { - GenericRemoteStorage::Local(storage) => { - do_download_storage_object(storage, byte_range, to_path).await - } - GenericRemoteStorage::S3(storage) => { - do_download_storage_object(storage, byte_range, to_path).await + match byte_range { + Some((start, end)) => { + self.download_byte_range(&remote_object_path, start, end) + .await } + None => self.download(&remote_object_path).await, } } } @@ -463,4 +469,23 @@ mod tests { "/foo/bar.baz..temp" ); } + + #[test] + fn object_name() { + let k = RemoteObjectId("a/b/c".to_owned()); + assert_eq!(k.object_name(), Some("c")); + + let k = RemoteObjectId("a/b/c/".to_owned()); + assert_eq!(k.object_name(), Some("c")); + + let k = RemoteObjectId("a/".to_owned()); + assert_eq!(k.object_name(), Some("a")); + + // XXX is it impossible to have an empty key? + let k = RemoteObjectId("".to_owned()); + assert_eq!(k.object_name(), None); + + let k = RemoteObjectId("/".to_owned()); + assert_eq!(k.object_name(), None); + } } diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index ddf6c01759..2561c0ca24 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -17,10 +17,19 @@ use tokio::{ }; use tracing::*; -use crate::{path_with_suffix_extension, Download, DownloadError}; +use crate::{path_with_suffix_extension, Download, DownloadError, RemoteObjectId}; use super::{strip_path_prefix, RemoteStorage, StorageMetadata}; +/// Convert a Path in the remote storage into a RemoteObjectId +fn remote_object_id_from_path(path: &Path) -> anyhow::Result { + Ok(RemoteObjectId( + path.to_str() + .ok_or_else(|| anyhow::anyhow!("unexpected characters found in path"))? + .to_string(), + )) +} + pub struct LocalFs { working_directory: PathBuf, storage_root: PathBuf, @@ -43,11 +52,17 @@ impl LocalFs { }) } - fn resolve_in_storage(&self, path: &Path) -> anyhow::Result { + /// + /// Get the absolute path in the local filesystem to given remote object. + /// + /// This is public so that it can be used in tests. Should not be used elsewhere. + /// + pub fn resolve_in_storage(&self, remote_object_id: &RemoteObjectId) -> anyhow::Result { + let path = PathBuf::from(&remote_object_id.0); if path.is_relative() { Ok(self.storage_root.join(path)) } else if path.starts_with(&self.storage_root) { - Ok(path.to_path_buf()) + Ok(path) } else { bail!( "Path '{}' does not belong to the current storage", @@ -85,38 +100,42 @@ impl LocalFs { #[async_trait::async_trait] impl RemoteStorage for LocalFs { - type RemoteObjectId = PathBuf; - - fn remote_object_id(&self, local_path: &Path) -> anyhow::Result { - Ok(self.storage_root.join( + /// Convert a "local" path into a "remote path" + fn remote_object_id(&self, local_path: &Path) -> anyhow::Result { + let path = self.storage_root.join( strip_path_prefix(&self.working_directory, local_path) .context("local path does not belong to this storage")?, - )) + ); + remote_object_id_from_path(&path) } - fn local_path(&self, storage_path: &Self::RemoteObjectId) -> anyhow::Result { - let relative_path = strip_path_prefix(&self.storage_root, storage_path) + fn local_path(&self, remote_object_id: &RemoteObjectId) -> anyhow::Result { + let storage_path = PathBuf::from(&remote_object_id.0); + let relative_path = strip_path_prefix(&self.storage_root, &storage_path) .context("local path does not belong to this storage")?; Ok(self.working_directory.join(relative_path)) } - async fn list(&self) -> anyhow::Result> { + async fn list(&self) -> anyhow::Result> { get_all_files(&self.storage_root, true).await } async fn list_prefixes( &self, - prefix: Option<&Self::RemoteObjectId>, - ) -> anyhow::Result> { - let path = prefix.unwrap_or(&self.storage_root); + prefix: Option<&RemoteObjectId>, + ) -> anyhow::Result> { + let path = match prefix { + Some(prefix) => Path::new(&prefix.0), + None => &self.storage_root, + }; get_all_files(path, false).await } async fn upload( &self, - from: impl io::AsyncRead + Unpin + Send + Sync + 'static, + from: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>, from_size_bytes: usize, - to: &Self::RemoteObjectId, + to: &RemoteObjectId, metadata: Option, ) -> anyhow::Result<()> { let target_file_path = self.resolve_in_storage(to)?; @@ -197,7 +216,7 @@ impl RemoteStorage for LocalFs { Ok(()) } - async fn download(&self, from: &Self::RemoteObjectId) -> Result { + async fn download(&self, from: &RemoteObjectId) -> Result { let file_path = self .resolve_in_storage(from) .map_err(DownloadError::BadInput)?; @@ -231,7 +250,7 @@ impl RemoteStorage for LocalFs { async fn download_byte_range( &self, - from: &Self::RemoteObjectId, + from: &RemoteObjectId, start_inclusive: u64, end_exclusive: Option, ) -> Result { @@ -285,7 +304,7 @@ impl RemoteStorage for LocalFs { } } - async fn delete(&self, path: &Self::RemoteObjectId) -> anyhow::Result<()> { + async fn delete(&self, path: &RemoteObjectId) -> anyhow::Result<()> { let file_path = self.resolve_in_storage(path)?; if file_path.exists() && file_path.is_file() { Ok(fs::remove_file(file_path).await?) @@ -296,6 +315,10 @@ impl RemoteStorage for LocalFs { ) } } + + fn as_local(&self) -> Option<&LocalFs> { + Some(self) + } } fn storage_metadata_path(original_path: &Path) -> PathBuf { @@ -305,7 +328,7 @@ fn storage_metadata_path(original_path: &Path) -> PathBuf { fn get_all_files<'a, P>( directory_path: P, recursive: bool, -) -> Pin>> + Send + Sync + 'a>> +) -> Pin>> + Send + Sync + 'a>> where P: AsRef + Send + Sync + 'a, { @@ -322,12 +345,12 @@ where debug!("{:?} us a symlink, skipping", entry_path) } else if file_type.is_dir() { if recursive { - paths.extend(get_all_files(entry_path, true).await?.into_iter()) + paths.extend(get_all_files(&entry_path, true).await?.into_iter()) } else { - paths.push(dir_entry.path()) + paths.push(remote_object_id_from_path(&dir_entry.path())?) } } else { - paths.push(dir_entry.path()); + paths.push(remote_object_id_from_path(&dir_entry.path())?); } } Ok(paths) @@ -389,9 +412,15 @@ mod pure_tests { .join("file_name"); let expected_path = storage_root.join(local_path.strip_prefix(&workdir)?); + let actual_path = PathBuf::from( + storage + .remote_object_id(&local_path) + .expect("Matching path should map to storage path normally") + .0, + ); assert_eq!( expected_path, - storage.remote_object_id(&local_path).expect("Matching path should map to storage path normally"), + actual_path, "File paths from workdir should be stored in local fs storage with the same path they have relative to the workdir" ); @@ -452,7 +481,9 @@ mod pure_tests { assert_eq!( local_path, storage - .local_path(&storage_root.join(local_path.strip_prefix(&workdir)?)) + .local_path(&remote_object_id_from_path( + &storage_root.join(local_path.strip_prefix(&workdir)?) + )?) .expect("For a valid input, valid local path should be parsed"), "Should be able to parse metadata out of the correctly named remote delta file" ); @@ -476,8 +507,7 @@ mod pure_tests { #[test] fn local_path_negatives() -> anyhow::Result<()> { #[track_caller] - #[allow(clippy::ptr_arg)] // have to use &PathBuf due to `storage.local_path` parameter requirements - fn local_path_error(storage: &LocalFs, storage_path: &PathBuf) -> String { + fn local_path_error(storage: &LocalFs, storage_path: &RemoteObjectId) -> String { match storage.local_path(storage_path) { Ok(wrong_path) => panic!( "Expected local path input {:?} to cause an error, but got file path: {:?}", @@ -494,7 +524,8 @@ mod pure_tests { }; let totally_wrong_path = "wrong_wrong_wrong"; - let error_message = local_path_error(&storage, &PathBuf::from(totally_wrong_path)); + let error_message = + local_path_error(&storage, &RemoteObjectId(totally_wrong_path.to_string())); assert!(error_message.contains(totally_wrong_path)); Ok(()) @@ -537,7 +568,7 @@ mod fs_tests { storage: &LocalFs, #[allow(clippy::ptr_arg)] // have to use &PathBuf due to `storage.local_path` parameter requirements - remote_storage_path: &PathBuf, + remote_storage_path: &RemoteObjectId, expected_metadata: Option<&StorageMetadata>, ) -> anyhow::Result { let mut download = storage @@ -568,12 +599,20 @@ mod fs_tests { "whatever_contents", ) .await?; - let target_path = PathBuf::from("/").join("somewhere").join("else"); - match storage.upload(file, size, &target_path, None).await { + let target_path = "/somewhere/else"; + match storage + .upload( + Box::new(file), + size, + &RemoteObjectId(target_path.to_string()), + None, + ) + .await + { Ok(()) => panic!("Should not allow storing files with wrong target path"), Err(e) => { let message = format!("{:?}", e); - assert!(message.contains(&target_path.display().to_string())); + assert!(message.contains(target_path)); assert!(message.contains("does not belong to the current storage")); } } @@ -606,20 +645,20 @@ mod fs_tests { // Check that you get an error if the size parameter doesn't match the actual // size of the stream. storage - .upload(content.clone(), 0, &id, None) + .upload(Box::new(content.clone()), 0, &id, None) .await .expect_err("upload with zero size succeeded"); storage - .upload(content.clone(), 4, &id, None) + .upload(Box::new(content.clone()), 4, &id, None) .await .expect_err("upload with too short size succeeded"); storage - .upload(content.clone(), 6, &id, None) + .upload(Box::new(content.clone()), 6, &id, None) .await .expect_err("upload with too large size succeeded"); // Correct size is 5, this should succeed. - storage.upload(content, 5, &id, None).await?; + storage.upload(Box::new(content), 5, &id, None).await?; Ok(()) } @@ -643,8 +682,8 @@ mod fs_tests { "We should upload and download the same contents" ); - let non_existing_path = PathBuf::from("somewhere").join("else"); - match storage.download(&non_existing_path).await { + let non_existing_path = "somewhere/else"; + match storage.download(&RemoteObjectId(non_existing_path.to_string())).await { Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"), } @@ -783,7 +822,7 @@ mod fs_tests { Err(e) => { let error_string = e.to_string(); assert!(error_string.contains("does not exist")); - assert!(error_string.contains(&upload_target.display().to_string())); + assert!(error_string.contains(&upload_target.0)); } } Ok(()) @@ -844,15 +883,19 @@ mod fs_tests { storage: &LocalFs, name: &str, metadata: Option, - ) -> anyhow::Result { + ) -> anyhow::Result { let timeline_path = workdir.join("timelines").join("some_timeline"); let relative_timeline_path = timeline_path.strip_prefix(&workdir)?; let storage_path = storage.storage_root.join(relative_timeline_path).join(name); + let remote_object_id = RemoteObjectId(storage_path.to_str().unwrap().to_string()); let from_path = storage.working_directory.join(name); let (file, size) = create_file_for_upload(&from_path, &dummy_contents(name)).await?; - storage.upload(file, size, &storage_path, metadata).await?; - Ok(storage_path) + + storage + .upload(Box::new(file), size, &remote_object_id, metadata) + .await?; + remote_object_id_from_path(&storage_path) } async fn create_file_for_upload( @@ -877,9 +920,9 @@ mod fs_tests { format!("contents for {name}") } - async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result> { + async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result> { let mut files = storage.list().await?; - files.sort(); + files.sort_by(|a, b| a.0.cmp(&b.0)); Ok(files) } } diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index db31200c36..74632430cd 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -19,7 +19,10 @@ use tokio::{io, sync::Semaphore}; use tokio_util::io::ReaderStream; use tracing::debug; -use crate::{strip_path_prefix, Download, DownloadError, RemoteStorage, S3Config}; +use crate::{ + strip_path_prefix, Download, DownloadError, RemoteObjectId, RemoteStorage, S3Config, + REMOTE_STORAGE_PREFIX_SEPARATOR, +}; use super::StorageMetadata; @@ -88,50 +91,26 @@ pub(super) mod metrics { } } -const S3_PREFIX_SEPARATOR: char = '/'; +fn download_destination( + id: &RemoteObjectId, + workdir: &Path, + prefix_to_strip: Option<&str>, +) -> PathBuf { + let path_without_prefix = match prefix_to_strip { + Some(prefix) => id.0.strip_prefix(prefix).unwrap_or_else(|| { + panic!( + "Could not strip prefix '{}' from S3 object key '{}'", + prefix, id.0 + ) + }), + None => &id.0, + }; -#[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Hash)] -pub struct S3ObjectKey(String); - -impl S3ObjectKey { - /// Turn a/b/c or a/b/c/ into c - pub fn object_name(&self) -> Option<&str> { - // corner case, char::to_string is not const, thats why this is more verbose than it needs to be - // see https://github.com/rust-lang/rust/issues/88674 - if self.0.len() == 1 && self.0.chars().next().unwrap() == S3_PREFIX_SEPARATOR { - return None; - } - - if self.0.ends_with(S3_PREFIX_SEPARATOR) { - self.0.rsplit(S3_PREFIX_SEPARATOR).nth(1) - } else { - self.0 - .rsplit_once(S3_PREFIX_SEPARATOR) - .map(|(_, last)| last) - } - } - - fn key(&self) -> &str { - &self.0 - } - - fn download_destination(&self, workdir: &Path, prefix_to_strip: Option<&str>) -> PathBuf { - let path_without_prefix = match prefix_to_strip { - Some(prefix) => self.0.strip_prefix(prefix).unwrap_or_else(|| { - panic!( - "Could not strip prefix '{}' from S3 object key '{}'", - prefix, self.0 - ) - }), - None => &self.0, - }; - - workdir.join( - path_without_prefix - .split(S3_PREFIX_SEPARATOR) - .collect::(), - ) - } + workdir.join( + path_without_prefix + .split(REMOTE_STORAGE_PREFIX_SEPARATOR) + .collect::(), + ) } /// AWS S3 storage. @@ -193,12 +172,12 @@ impl S3Bucket { let prefix_in_bucket = aws_config.prefix_in_bucket.as_deref().map(|prefix| { let mut prefix = prefix; - while prefix.starts_with(S3_PREFIX_SEPARATOR) { + while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) { prefix = &prefix[1..] } let mut prefix = prefix.to_string(); - while prefix.ends_with(S3_PREFIX_SEPARATOR) { + while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) { prefix.pop(); } prefix @@ -249,23 +228,25 @@ impl S3Bucket { #[async_trait::async_trait] impl RemoteStorage for S3Bucket { - type RemoteObjectId = S3ObjectKey; - - fn remote_object_id(&self, local_path: &Path) -> anyhow::Result { + fn remote_object_id(&self, local_path: &Path) -> anyhow::Result { let relative_path = strip_path_prefix(&self.workdir, local_path)?; let mut key = self.prefix_in_bucket.clone().unwrap_or_default(); for segment in relative_path { - key.push(S3_PREFIX_SEPARATOR); + key.push(REMOTE_STORAGE_PREFIX_SEPARATOR); key.push_str(&segment.to_string_lossy()); } - Ok(S3ObjectKey(key)) + Ok(RemoteObjectId(key)) } - fn local_path(&self, storage_path: &Self::RemoteObjectId) -> anyhow::Result { - Ok(storage_path.download_destination(&self.workdir, self.prefix_in_bucket.as_deref())) + fn local_path(&self, storage_path: &RemoteObjectId) -> anyhow::Result { + Ok(download_destination( + storage_path, + &self.workdir, + self.prefix_in_bucket.as_deref(), + )) } - async fn list(&self) -> anyhow::Result> { + async fn list(&self) -> anyhow::Result> { let mut document_keys = Vec::new(); let mut continuation_token = None; @@ -296,7 +277,7 @@ impl RemoteStorage for S3Bucket { .contents .unwrap_or_default() .into_iter() - .filter_map(|o| Some(S3ObjectKey(o.key?))), + .filter_map(|o| Some(RemoteObjectId(o.key?))), ); match fetch_response.continuation_token { @@ -312,8 +293,8 @@ impl RemoteStorage for S3Bucket { /// Note: it wont include empty "directories" async fn list_prefixes( &self, - prefix: Option<&Self::RemoteObjectId>, - ) -> anyhow::Result> { + prefix: Option<&RemoteObjectId>, + ) -> anyhow::Result> { // get the passed prefix or if it is not set use prefix_in_bucket value let list_prefix = prefix .map(|p| p.0.clone()) @@ -321,8 +302,8 @@ impl RemoteStorage for S3Bucket { .map(|mut p| { // required to end with a separator // otherwise request will return only the entry of a prefix - if !p.ends_with(S3_PREFIX_SEPARATOR) { - p.push(S3_PREFIX_SEPARATOR); + if !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) { + p.push(REMOTE_STORAGE_PREFIX_SEPARATOR); } p }); @@ -345,7 +326,7 @@ impl RemoteStorage for S3Bucket { bucket: self.bucket_name.clone(), prefix: list_prefix.clone(), continuation_token, - delimiter: Some(S3_PREFIX_SEPARATOR.to_string()), + delimiter: Some(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string()), ..ListObjectsV2Request::default() }) .await @@ -359,7 +340,7 @@ impl RemoteStorage for S3Bucket { .common_prefixes .unwrap_or_default() .into_iter() - .filter_map(|o| Some(S3ObjectKey(o.prefix?))), + .filter_map(|o| Some(RemoteObjectId(o.prefix?))), ); match fetch_response.continuation_token { @@ -373,9 +354,9 @@ impl RemoteStorage for S3Bucket { async fn upload( &self, - from: impl io::AsyncRead + Unpin + Send + Sync + 'static, + from: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>, from_size_bytes: usize, - to: &Self::RemoteObjectId, + to: &RemoteObjectId, metadata: Option, ) -> anyhow::Result<()> { let _guard = self @@ -392,7 +373,7 @@ impl RemoteStorage for S3Bucket { from_size_bytes, )), bucket: self.bucket_name.clone(), - key: to.key().to_owned(), + key: to.0.to_owned(), metadata: metadata.map(|m| m.0), ..PutObjectRequest::default() }) @@ -404,10 +385,10 @@ impl RemoteStorage for S3Bucket { Ok(()) } - async fn download(&self, from: &Self::RemoteObjectId) -> Result { + async fn download(&self, from: &RemoteObjectId) -> Result { self.download_object(GetObjectRequest { bucket: self.bucket_name.clone(), - key: from.key().to_owned(), + key: from.0.to_owned(), ..GetObjectRequest::default() }) .await @@ -415,7 +396,7 @@ impl RemoteStorage for S3Bucket { async fn download_byte_range( &self, - from: &Self::RemoteObjectId, + from: &RemoteObjectId, start_inclusive: u64, end_exclusive: Option, ) -> Result { @@ -429,14 +410,14 @@ impl RemoteStorage for S3Bucket { self.download_object(GetObjectRequest { bucket: self.bucket_name.clone(), - key: from.key().to_owned(), + key: from.0.to_owned(), range, ..GetObjectRequest::default() }) .await } - async fn delete(&self, path: &Self::RemoteObjectId) -> anyhow::Result<()> { + async fn delete(&self, remote_object_id: &RemoteObjectId) -> anyhow::Result<()> { let _guard = self .concurrency_limiter .acquire() @@ -448,7 +429,7 @@ impl RemoteStorage for S3Bucket { self.client .delete_object(DeleteObjectRequest { bucket: self.bucket_name.clone(), - key: path.key().to_owned(), + key: remote_object_id.0.to_owned(), ..DeleteObjectRequest::default() }) .await @@ -467,43 +448,24 @@ mod tests { use super::*; #[test] - fn object_name() { - let k = S3ObjectKey("a/b/c".to_owned()); - assert_eq!(k.object_name(), Some("c")); - - let k = S3ObjectKey("a/b/c/".to_owned()); - assert_eq!(k.object_name(), Some("c")); - - let k = S3ObjectKey("a/".to_owned()); - assert_eq!(k.object_name(), Some("a")); - - // XXX is it impossible to have an empty key? - let k = S3ObjectKey("".to_owned()); - assert_eq!(k.object_name(), None); - - let k = S3ObjectKey("/".to_owned()); - assert_eq!(k.object_name(), None); - } - - #[test] - fn download_destination() -> anyhow::Result<()> { + fn test_download_destination() -> anyhow::Result<()> { let workdir = tempdir()?.path().to_owned(); let local_path = workdir.join("one").join("two").join("test_name"); let relative_path = local_path.strip_prefix(&workdir)?; - let key = S3ObjectKey(format!( + let key = RemoteObjectId(format!( "{}{}", - S3_PREFIX_SEPARATOR, + REMOTE_STORAGE_PREFIX_SEPARATOR, relative_path .iter() .map(|segment| segment.to_str().unwrap()) .collect::>() - .join(&S3_PREFIX_SEPARATOR.to_string()), + .join(&REMOTE_STORAGE_PREFIX_SEPARATOR.to_string()), )); assert_eq!( local_path, - key.download_destination(&workdir, None), + download_destination(&key, &workdir, None), "Download destination should consist of s3 path joined with the workdir prefix" ); @@ -520,8 +482,8 @@ mod tests { let storage = dummy_storage(workdir); - let expected_key = S3ObjectKey(format!( - "{}{S3_PREFIX_SEPARATOR}{segment_1}{S3_PREFIX_SEPARATOR}{segment_2}", + let expected_key = RemoteObjectId(format!( + "{}{REMOTE_STORAGE_PREFIX_SEPARATOR}{segment_1}{REMOTE_STORAGE_PREFIX_SEPARATOR}{segment_2}", storage.prefix_in_bucket.as_deref().unwrap_or_default(), )); @@ -592,7 +554,7 @@ mod tests { storage.prefix_in_bucket.as_deref(), ); assert_eq!( - s3_key.download_destination(&workdir, storage.prefix_in_bucket.as_deref()), + download_destination(&s3_key, &workdir, storage.prefix_in_bucket.as_deref()), storage .local_path(&s3_key) .expect("For a valid input, valid S3 info should be parsed"), @@ -604,7 +566,7 @@ mod tests { storage.prefix_in_bucket.as_deref(), ); assert_eq!( - s3_key.download_destination(&workdir, storage.prefix_in_bucket.as_deref()), + download_destination(&s3_key, &workdir, storage.prefix_in_bucket.as_deref()), storage .local_path(&s3_key) .expect("For a valid input, valid S3 info should be parsed"), @@ -645,11 +607,11 @@ mod tests { } } - fn create_s3_key(relative_file_path: &Path, prefix: Option<&str>) -> S3ObjectKey { - S3ObjectKey(relative_file_path.iter().fold( + fn create_s3_key(relative_file_path: &Path, prefix: Option<&str>) -> RemoteObjectId { + RemoteObjectId(relative_file_path.iter().fold( prefix.unwrap_or_default().to_string(), |mut path_string, segment| { - path_string.push(S3_PREFIX_SEPARATOR); + path_string.push(REMOTE_STORAGE_PREFIX_SEPARATOR); path_string.push_str(segment.to_str().unwrap()); path_string }, diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 7a33a548e7..5a43516728 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -1,7 +1,7 @@ //! Main entry point for the Page Server executable. use remote_storage::GenericRemoteStorage; -use std::{env, ops::ControlFlow, path::Path, str::FromStr, sync::Arc}; +use std::{env, ops::ControlFlow, path::Path, str::FromStr}; use tracing::*; use anyhow::{bail, Context, Result}; @@ -302,11 +302,13 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<() let remote_storage = conf .remote_storage_config .as_ref() - .map(|storage_config| GenericRemoteStorage::new(conf.workdir.clone(), storage_config)) + .map(|storage_config| { + GenericRemoteStorage::from_config(conf.workdir.clone(), storage_config) + }) .transpose() - .context("Failed to init generic remote storage")? - .map(Arc::new); - let remote_index = tenant_mgr::init_tenant_mgr(conf, remote_storage.as_ref().map(Arc::clone))?; + .context("Failed to init generic remote storage")?; + + let remote_index = tenant_mgr::init_tenant_mgr(conf, remote_storage.clone())?; // Spawn a new thread for the http endpoint // bind before launching separate thread so the error reported before startup exits diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 09c4812067..a31c2fd2a5 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -34,7 +34,7 @@ struct State { auth: Option>, remote_index: RemoteIndex, allowlist_routes: Vec, - remote_storage: Option>, + remote_storage: Option, } impl State { @@ -42,7 +42,7 @@ impl State { conf: &'static PageServerConf, auth: Option>, remote_index: RemoteIndex, - remote_storage: Option>, + remote_storage: Option, ) -> anyhow::Result { let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"] .iter() @@ -659,7 +659,7 @@ pub fn make_router( conf: &'static PageServerConf, auth: Option>, remote_index: RemoteIndex, - remote_storage: Option>, + remote_storage: Option, ) -> anyhow::Result> { let spec = include_bytes!("openapi_spec.yml"); let mut router = attach_openapi_ui(endpoint::make_router(), spec, "/swagger.yml", "/v1/doc"); diff --git a/pageserver/src/storage_sync.rs b/pageserver/src/storage_sync.rs index 491f882e0b..42fd6b8ea8 100644 --- a/pageserver/src/storage_sync.rs +++ b/pageserver/src/storage_sync.rs @@ -150,7 +150,7 @@ use std::{ num::{NonZeroU32, NonZeroUsize}, ops::ControlFlow, path::{Path, PathBuf}, - sync::{Arc, Condvar, Mutex}, + sync::{Condvar, Mutex}, }; use anyhow::{anyhow, bail, Context}; @@ -222,7 +222,7 @@ pub struct SyncStartupData { /// Along with that, scans tenant files local and remote (if the sync gets enabled) to check the initial timeline states. pub fn start_local_timeline_sync( config: &'static PageServerConf, - storage: Option>, + storage: Option, ) -> anyhow::Result { let local_timeline_files = local_tenant_timeline_files(config) .context("Failed to collect local tenant timeline files")?; @@ -766,7 +766,7 @@ pub fn schedule_layer_download(tenant_id: ZTenantId, timeline_id: ZTimelineId) { pub(super) fn spawn_storage_sync_thread( conf: &'static PageServerConf, local_timeline_files: HashMap)>, - storage: Arc, + storage: GenericRemoteStorage, max_concurrent_timelines_sync: NonZeroUsize, max_sync_errors: NonZeroU32, ) -> anyhow::Result { @@ -825,12 +825,12 @@ pub(super) fn spawn_storage_sync_thread( fn storage_sync_loop( runtime: Runtime, conf: &'static PageServerConf, - (storage, index, sync_queue): (Arc, RemoteIndex, &SyncQueue), + (storage, index, sync_queue): (GenericRemoteStorage, RemoteIndex, &SyncQueue), max_sync_errors: NonZeroU32, ) { info!("Starting remote storage sync loop"); loop { - let loop_storage = Arc::clone(&storage); + let loop_storage = storage.clone(); let (batched_tasks, remaining_queue_length) = sync_queue.next_task_batch(); @@ -939,7 +939,7 @@ enum UploadStatus { async fn process_batches( conf: &'static PageServerConf, max_sync_errors: NonZeroU32, - storage: Arc, + storage: GenericRemoteStorage, index: &RemoteIndex, batched_tasks: HashMap, sync_queue: &SyncQueue, @@ -947,7 +947,7 @@ async fn process_batches( let mut sync_results = batched_tasks .into_iter() .map(|(sync_id, batch)| { - let storage = Arc::clone(&storage); + let storage = storage.clone(); let index = index.clone(); async move { let state_update = process_sync_task_batch( @@ -981,7 +981,7 @@ async fn process_batches( async fn process_sync_task_batch( conf: &'static PageServerConf, - (storage, index, sync_queue): (Arc, RemoteIndex, &SyncQueue), + (storage, index, sync_queue): (GenericRemoteStorage, RemoteIndex, &SyncQueue), max_sync_errors: NonZeroU32, sync_id: ZTenantTimelineId, batch: SyncTaskBatch, @@ -1009,7 +1009,7 @@ async fn process_sync_task_batch( ControlFlow::Continue(()) => { upload_timeline_data( conf, - (storage.as_ref(), &index, sync_queue), + (&storage, &index, sync_queue), current_remote_timeline.as_ref(), sync_id, upload_data, @@ -1020,7 +1020,7 @@ async fn process_sync_task_batch( } ControlFlow::Break(()) => match update_remote_data( conf, - storage.as_ref(), + &storage, &index, sync_id, RemoteDataUpdate::Upload { @@ -1053,7 +1053,7 @@ async fn process_sync_task_batch( ControlFlow::Continue(()) => { return download_timeline_data( conf, - (storage.as_ref(), &index, sync_queue), + (&storage, &index, sync_queue), current_remote_timeline.as_ref(), sync_id, download_data, @@ -1086,7 +1086,7 @@ async fn process_sync_task_batch( ControlFlow::Continue(()) => { delete_timeline_data( conf, - (storage.as_ref(), &index, sync_queue), + (&storage, &index, sync_queue), sync_id, delete_data, sync_start, @@ -1098,7 +1098,7 @@ async fn process_sync_task_batch( ControlFlow::Break(()) => { if let Err(e) = update_remote_data( conf, - storage.as_ref(), + &storage, &index, sync_id, RemoteDataUpdate::Delete(&delete_data.data.deleted_layers), diff --git a/pageserver/src/storage_sync/delete.rs b/pageserver/src/storage_sync/delete.rs index d80a082d0c..794ecbaeb3 100644 --- a/pageserver/src/storage_sync/delete.rs +++ b/pageserver/src/storage_sync/delete.rs @@ -7,15 +7,15 @@ use futures::stream::{FuturesUnordered, StreamExt}; use tracing::{debug, error, info}; use crate::storage_sync::{SyncQueue, SyncTask}; -use remote_storage::{GenericRemoteStorage, RemoteStorage}; +use remote_storage::GenericRemoteStorage; use utils::zid::ZTenantTimelineId; use super::{LayersDeletion, SyncData}; /// Attempts to remove the timleline layers from the remote storage. /// If the task had not adjusted the metadata before, the deletion will fail. -pub(super) async fn delete_timeline_layers<'a>( - storage: &'a GenericRemoteStorage, +pub(super) async fn delete_timeline_layers( + storage: &GenericRemoteStorage, sync_queue: &SyncQueue, sync_id: ZTenantTimelineId, mut delete_data: SyncData, @@ -43,14 +43,7 @@ pub(super) async fn delete_timeline_layers<'a>( let mut delete_tasks = layers_to_delete .into_iter() .map(|local_layer_path| async { - match match storage { - GenericRemoteStorage::Local(storage) => { - remove_storage_object(storage, &local_layer_path).await - } - GenericRemoteStorage::S3(storage) => { - remove_storage_object(storage, &local_layer_path).await - } - } { + match remove_storage_object(storage, &local_layer_path).await { Ok(()) => Ok(local_layer_path), Err(e) => Err((e, local_layer_path)), } @@ -88,11 +81,10 @@ pub(super) async fn delete_timeline_layers<'a>( errored } -async fn remove_storage_object(storage: &S, local_layer_path: &Path) -> anyhow::Result<()> -where - P: std::fmt::Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, -{ +async fn remove_storage_object( + storage: &GenericRemoteStorage, + local_layer_path: &Path, +) -> anyhow::Result<()> { let storage_path = storage .remote_object_id(local_layer_path) .with_context(|| { @@ -132,7 +124,7 @@ mod tests { let harness = RepoHarness::create("delete_timeline_negative")?; let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap()); let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); - let storage = GenericRemoteStorage::Local(LocalFs::new( + let storage = GenericRemoteStorage::new(LocalFs::new( tempdir()?.path().to_path_buf(), harness.conf.workdir.clone(), )?); @@ -167,7 +159,7 @@ mod tests { let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); let layer_files = ["a", "b", "c", "d"]; - let storage = GenericRemoteStorage::Local(LocalFs::new( + let storage = GenericRemoteStorage::new(LocalFs::new( tempdir()?.path().to_path_buf(), harness.conf.workdir.clone(), )?); @@ -180,7 +172,8 @@ mod tests { let timeline_upload = create_local_timeline(&harness, TIMELINE_ID, &layer_files, metadata.clone()).await?; for local_path in timeline_upload.layers_to_upload { - let remote_path = local_storage.remote_object_id(&local_path)?; + let remote_path = + local_storage.resolve_in_storage(&local_storage.remote_object_id(&local_path)?)?; let remote_parent_dir = remote_path.parent().unwrap(); if !remote_parent_dir.exists() { fs::create_dir_all(&remote_parent_dir).await?; diff --git a/pageserver/src/storage_sync/download.rs b/pageserver/src/storage_sync/download.rs index e11a863dcc..372ca0a463 100644 --- a/pageserver/src/storage_sync/download.rs +++ b/pageserver/src/storage_sync/download.rs @@ -9,9 +9,7 @@ use std::{ use anyhow::Context; use futures::stream::{FuturesUnordered, StreamExt}; -use remote_storage::{ - path_with_suffix_extension, DownloadError, GenericRemoteStorage, RemoteStorage, -}; +use remote_storage::{path_with_suffix_extension, DownloadError, GenericRemoteStorage}; use tokio::{ fs, io::{self, AsyncWriteExt}, @@ -371,68 +369,6 @@ async fn get_timeline_sync_ids( tenant_path: &Path, tenant_id: ZTenantId, ) -> anyhow::Result> { - let timeline_ids: Vec = match storage { - GenericRemoteStorage::Local(storage) => list_prefixes(storage, tenant_path) - .await? - .into_iter() - .map(|timeline_directory_path| { - timeline_directory_path - .file_stem() - .with_context(|| { - format!( - "Failed to get timeline id string from file '{}'", - timeline_directory_path.display() - ) - })? - .to_string_lossy() - .as_ref() - .parse() - .with_context(|| { - format!( - "failed to parse directory name '{}' as timeline id", - timeline_directory_path.display() - ) - }) - }) - .collect::>(), - GenericRemoteStorage::S3(storage) => list_prefixes(storage, tenant_path) - .await? - .into_iter() - .map(|s3_path| { - s3_path - .object_name() - .with_context(|| { - format!("Failed to get object name out of S3 path {s3_path:?}") - })? - .parse() - .with_context(|| { - format!("failed to parse object name '{s3_path:?}' as timeline id") - }) - }) - .collect::>(), - } - .with_context(|| { - format!("Tenant {tenant_id} has at least one incorrect timeline subdirectory") - })?; - - if timeline_ids.is_empty() { - anyhow::bail!("no timelines found on the remote storage for tenant {tenant_id}") - } - - Ok(timeline_ids - .into_iter() - .map(|timeline_id| ZTenantTimelineId { - tenant_id, - timeline_id, - }) - .collect()) -} - -async fn list_prefixes(storage: &S, tenant_path: &Path) -> anyhow::Result> -where - P: std::fmt::Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, -{ let tenant_storage_path = storage.remote_object_id(tenant_path).with_context(|| { format!( "Failed to get tenant storage path for local path '{}'", @@ -440,14 +376,37 @@ where ) })?; - storage + let timelines = storage .list_prefixes(Some(&tenant_storage_path)) .await .with_context(|| { format!( "Failed to list tenant storage path {tenant_storage_path:?} to get remote timelines to download" ) - }) + })?; + + if timelines.is_empty() { + anyhow::bail!("no timelines found on the remote storage") + } + + let mut sync_ids = HashSet::new(); + + for timeline_remote_storage_key in timelines { + let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| { + anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_id}") + })?; + + let timeline_id: ZTimelineId = object_name.parse().with_context(|| { + format!("failed to parse object name into timeline id '{object_name}'") + })?; + + sync_ids.insert(ZTenantTimelineId { + tenant_id, + timeline_id, + }); + } + + Ok(sync_ids) } async fn fsync_path(path: impl AsRef) -> Result<(), io::Error> { @@ -459,6 +418,7 @@ mod tests { use std::{ collections::{BTreeSet, HashSet}, num::NonZeroUsize, + path::PathBuf, }; use remote_storage::{LocalFs, RemoteStorage}; @@ -482,7 +442,7 @@ mod tests { let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); let layer_files = ["a", "b", "layer_to_skip", "layer_to_keep_locally"]; - let storage = GenericRemoteStorage::Local(LocalFs::new( + let storage = GenericRemoteStorage::new(LocalFs::new( tempdir()?.path().to_owned(), harness.conf.workdir.clone(), )?); @@ -494,7 +454,8 @@ mod tests { create_local_timeline(&harness, TIMELINE_ID, &layer_files, metadata.clone()).await?; for local_path in timeline_upload.layers_to_upload { - let remote_path = local_storage.remote_object_id(&local_path)?; + let remote_path = + local_storage.resolve_in_storage(&storage.remote_object_id(&local_path)?)?; let remote_parent_dir = remote_path.parent().unwrap(); if !remote_parent_dir.exists() { fs::create_dir_all(&remote_parent_dir).await?; @@ -580,7 +541,7 @@ mod tests { let harness = RepoHarness::create("download_timeline_negatives")?; let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap()); let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); - let storage = GenericRemoteStorage::Local(LocalFs::new( + let storage = GenericRemoteStorage::new(LocalFs::new( tempdir()?.path().to_owned(), harness.conf.workdir.clone(), )?); @@ -639,7 +600,7 @@ mod tests { let harness = RepoHarness::create("test_download_index_part")?; let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); - let storage = GenericRemoteStorage::Local(LocalFs::new( + let storage = GenericRemoteStorage::new(LocalFs::new( tempdir()?.path().to_owned(), harness.conf.workdir.clone(), )?); @@ -663,9 +624,10 @@ mod tests { let local_index_part_path = metadata_path(harness.conf, sync_id.timeline_id, sync_id.tenant_id) .with_file_name(IndexPart::FILE_NAME); - let storage_path = local_storage.remote_object_id(&local_index_part_path)?; - fs::create_dir_all(storage_path.parent().unwrap()).await?; - fs::write(&storage_path, serde_json::to_vec(&index_part)?).await?; + let index_part_remote_id = local_storage.remote_object_id(&local_index_part_path)?; + let index_part_local_path = PathBuf::from(String::from(index_part_remote_id)); + fs::create_dir_all(index_part_local_path.parent().unwrap()).await?; + fs::write(&index_part_local_path, serde_json::to_vec(&index_part)?).await?; let downloaded_index_part = download_index_part(harness.conf, &storage, sync_id).await?; diff --git a/pageserver/src/storage_sync/upload.rs b/pageserver/src/storage_sync/upload.rs index 8dd73d9431..7070f941f5 100644 --- a/pageserver/src/storage_sync/upload.rs +++ b/pageserver/src/storage_sync/upload.rs @@ -34,7 +34,11 @@ pub(super) async fn upload_index_part( let index_part_path = metadata_path(conf, sync_id.timeline_id, sync_id.tenant_id) .with_file_name(IndexPart::FILE_NAME); storage - .upload_storage_object(index_part_bytes, index_part_size, &index_part_path) + .upload_storage_object( + Box::new(index_part_bytes), + index_part_size, + &index_part_path, + ) .await .with_context(|| format!("Failed to upload index part for '{sync_id}'")) } @@ -119,7 +123,7 @@ pub(super) async fn upload_timeline_layers<'a>( .len() as usize; match storage - .upload_storage_object(source_file, source_size, &source_path) + .upload_storage_object(Box::new(source_file), source_size, &source_path) .await .with_context(|| format!("Failed to upload layer file for {sync_id}")) { @@ -214,8 +218,8 @@ mod tests { let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); let layer_files = ["a", "b"]; - let storage = GenericRemoteStorage::Local(LocalFs::new( - tempdir()?.path().to_owned(), + let storage = GenericRemoteStorage::new(LocalFs::new( + tempdir()?.path().to_path_buf(), harness.conf.workdir.clone(), )?); let local_storage = storage.as_local().unwrap(); @@ -302,7 +306,7 @@ mod tests { let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); let layer_files = ["a1", "b1"]; - let storage = GenericRemoteStorage::Local(LocalFs::new( + let storage = GenericRemoteStorage::new(LocalFs::new( tempdir()?.path().to_owned(), harness.conf.workdir.clone(), )?); @@ -395,7 +399,7 @@ mod tests { let harness = RepoHarness::create("test_upload_index_part")?; let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); - let storage = GenericRemoteStorage::Local(LocalFs::new( + let storage = GenericRemoteStorage::new(LocalFs::new( tempdir()?.path().to_owned(), harness.conf.workdir.clone(), )?); @@ -431,13 +435,13 @@ mod tests { let index_part_path = storage_files.first().unwrap(); assert_eq!( - index_part_path.file_name().and_then(|name| name.to_str()), + index_part_path.object_name(), Some(IndexPart::FILE_NAME), "Remote index part should have the correct name" ); - - let remote_index_part: IndexPart = - serde_json::from_slice(&fs::read(&index_part_path).await?)?; + let remote_index_part: IndexPart = serde_json::from_slice( + &fs::read(local_storage.resolve_in_storage(index_part_path)?).await?, + )?; assert_eq!( index_part, remote_index_part, "Remote index part should match the local one" diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index 7c82745142..041bd50737 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -134,7 +134,7 @@ impl fmt::Display for TenantState { /// are scheduled for download and added to the repository once download is completed. pub fn init_tenant_mgr( conf: &'static PageServerConf, - remote_storage: Option>, + remote_storage: Option, ) -> anyhow::Result { let (timeline_updates_sender, timeline_updates_receiver) = mpsc::unbounded_channel::(); diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 6acc70e85a..5d946e37a4 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -127,7 +127,8 @@ async fn wal_backup_launcher_main_loop( let conf_ = conf.clone(); REMOTE_STORAGE.get_or_init(|| { conf_.remote_storage.as_ref().map(|c| { - GenericRemoteStorage::new(conf_.workdir, c).expect("failed to create remote storage") + GenericRemoteStorage::from_config(conf_.workdir, c) + .expect("failed to create remote storage") }) }); @@ -417,7 +418,11 @@ fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec { static REMOTE_STORAGE: OnceCell> = OnceCell::new(); async fn backup_object(source_file: &Path, size: usize) -> Result<()> { - let storage = REMOTE_STORAGE.get().expect("failed to get remote storage"); + let storage = REMOTE_STORAGE + .get() + .expect("failed to get remote storage") + .as_ref() + .unwrap(); let file = tokio::io::BufReader::new(File::open(&source_file).await.with_context(|| { format!( @@ -427,9 +432,7 @@ async fn backup_object(source_file: &Path, size: usize) -> Result<()> { })?); storage - .as_ref() - .expect("Storage should be initialized by launcher at this point.") - .upload_storage_object(file, size, source_file) + .upload_storage_object(Box::new(file), size, source_file) .await }