diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 5b74308514..e0cc3ca543 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -13,7 +13,6 @@ use std::{ collections::HashMap, fmt::Debug, num::{NonZeroU32, NonZeroUsize}, - ops::Deref, path::{Path, PathBuf}, pin::Pin, sync::Arc, @@ -90,7 +89,7 @@ pub trait RemoteStorage: Send + Sync + 'static { /// Streams the local file contents into remote into the remote storage entry. async fn upload( &self, - data: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>, + from: impl io::AsyncRead + Unpin + Send + Sync + 'static, // S3 PUT request requires the content length to be specified, // otherwise it starts to fail with the concurrent connection count increasing. data_size_bytes: usize, @@ -161,14 +160,67 @@ pub enum GenericRemoteStorage { Unreliable(Arc), } -impl Deref for GenericRemoteStorage { - type Target = dyn RemoteStorage; - - fn deref(&self) -> &Self::Target { +impl GenericRemoteStorage { + pub async fn list_prefixes( + &self, + prefix: Option<&RemotePath>, + ) -> Result, DownloadError> { match self { - GenericRemoteStorage::LocalFs(local_fs) => local_fs, - GenericRemoteStorage::AwsS3(s3_bucket) => s3_bucket.as_ref(), - GenericRemoteStorage::Unreliable(s) => s.as_ref(), + Self::LocalFs(s) => s.list_prefixes(prefix).await, + Self::AwsS3(s) => s.list_prefixes(prefix).await, + Self::Unreliable(s) => s.list_prefixes(prefix).await, + } + } + + pub async fn upload( + &self, + from: impl io::AsyncRead + Unpin + Send + Sync + 'static, + data_size_bytes: usize, + to: &RemotePath, + metadata: Option, + ) -> anyhow::Result<()> { + match self { + Self::LocalFs(s) => s.upload(from, data_size_bytes, to, metadata).await, + Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata).await, + Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata).await, + } + } + + pub async fn download(&self, from: &RemotePath) -> Result { + match self { + Self::LocalFs(s) => s.download(from).await, + Self::AwsS3(s) => s.download(from).await, + Self::Unreliable(s) => s.download(from).await, + } + } + + pub async fn download_byte_range( + &self, + from: &RemotePath, + start_inclusive: u64, + end_exclusive: Option, + ) -> Result { + match self { + Self::LocalFs(s) => { + s.download_byte_range(from, start_inclusive, end_exclusive) + .await + } + Self::AwsS3(s) => { + s.download_byte_range(from, start_inclusive, end_exclusive) + .await + } + Self::Unreliable(s) => { + s.download_byte_range(from, start_inclusive, end_exclusive) + .await + } + } + } + + pub async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> { + match self { + Self::LocalFs(s) => s.delete(path).await, + Self::AwsS3(s) => s.delete(path).await, + Self::Unreliable(s) => s.delete(path).await, } } } @@ -199,7 +251,7 @@ impl GenericRemoteStorage { /// this path is used for the remote object id conversion only. pub async fn upload_storage_object( &self, - from: Box, + from: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static, from_size_bytes: usize, to: &RemotePath, ) -> anyhow::Result<()> { diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 21a4156ad3..d7b46731cd 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -118,7 +118,7 @@ impl RemoteStorage for LocalFs { async fn upload( &self, - data: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>, + data: impl io::AsyncRead + Unpin + Send + Sync + 'static, data_size_bytes: usize, to: &RemotePath, metadata: Option, diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index fdf3ae02d3..e6c1e19ad5 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -343,7 +343,7 @@ impl RemoteStorage for S3Bucket { async fn upload( &self, - from: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>, + from: impl io::AsyncRead + Unpin + Send + Sync + 'static, from_size_bytes: usize, to: &RemotePath, metadata: Option, diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index d1d062f8e7..cb40859831 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -84,7 +84,7 @@ impl RemoteStorage for UnreliableWrapper { async fn upload( &self, - data: Box<(dyn tokio::io::AsyncRead + Unpin + Send + Sync + 'static)>, + data: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static, // S3 PUT request requires the content length to be specified, // otherwise it starts to fail with the concurrent connection count increasing. data_size_bytes: usize,