From b2d78dc07a07f3c0ab44b29da88c23948011008c Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Fri, 7 Apr 2023 16:35:54 +0300 Subject: [PATCH] Use a few hacks to use the nightly async trait --- libs/remote_storage/src/lib.rs | 24 +++--- libs/remote_storage/src/local_fs.rs | 1 - libs/remote_storage/src/s3_bucket.rs | 1 - libs/remote_storage/src/simulate_failures.rs | 87 ++++++++++---------- 4 files changed, 56 insertions(+), 57 deletions(-) diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index e0cc3ca543..bb72577b28 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(async_fn_in_trait)] + //! A set of generic storage abstractions for the page server to use when backing up and restoring its state from the external storage. //! No other modules from this tree are supposed to be used directly by the external code. //! @@ -75,7 +77,6 @@ impl RemotePath { /// Storage (potentially remote) API to manage its state. /// This storage tries to be unaware of any layered repository context, /// providing basic CRUD operations for storage files. -#[async_trait::async_trait] pub trait RemoteStorage: Send + Sync + 'static { /// Lists all top level subdirectories for a given prefix /// Note: here we assume that if the prefix is passed it was obtained via remote_object_id @@ -157,7 +158,7 @@ impl std::error::Error for DownloadError {} pub enum GenericRemoteStorage { LocalFs(LocalFs), AwsS3(Arc), - Unreliable(Arc), + // Unreliable(Arc), } impl GenericRemoteStorage { @@ -168,7 +169,7 @@ impl GenericRemoteStorage { match self { Self::LocalFs(s) => s.list_prefixes(prefix).await, Self::AwsS3(s) => s.list_prefixes(prefix).await, - Self::Unreliable(s) => s.list_prefixes(prefix).await, + // Self::Unreliable(s) => s.list_prefixes(prefix).await, } } @@ -182,7 +183,7 @@ impl GenericRemoteStorage { match self { Self::LocalFs(s) => s.upload(from, data_size_bytes, to, metadata).await, Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata).await, - Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata).await, + // Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata).await, } } @@ -190,7 +191,7 @@ impl GenericRemoteStorage { match self { Self::LocalFs(s) => s.download(from).await, Self::AwsS3(s) => s.download(from).await, - Self::Unreliable(s) => s.download(from).await, + // Self::Unreliable(s) => s.download(from).await, } } @@ -209,10 +210,10 @@ impl GenericRemoteStorage { s.download_byte_range(from, start_inclusive, end_exclusive) .await } - Self::Unreliable(s) => { - s.download_byte_range(from, start_inclusive, end_exclusive) - .await - } + // Self::Unreliable(s) => { + // s.download_byte_range(from, start_inclusive, end_exclusive) + // .await + // } } } @@ -220,7 +221,7 @@ impl GenericRemoteStorage { match self { Self::LocalFs(s) => s.delete(path).await, Self::AwsS3(s) => s.delete(path).await, - Self::Unreliable(s) => s.delete(path).await, + // Self::Unreliable(s) => s.delete(path).await, } } } @@ -241,7 +242,8 @@ impl GenericRemoteStorage { } pub fn unreliable_wrapper(s: Self, fail_first: u64) -> Self { - Self::Unreliable(Arc::new(UnreliableWrapper::new(s, fail_first))) + // Self::Unreliable(Arc::new(UnreliableWrapper::new(s, fail_first))) + unimplemented!("TODO kb") } /// Takes storage object contents and its size and uploads to remote storage, diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index d7b46731cd..68403920be 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -91,7 +91,6 @@ impl LocalFs { } } -#[async_trait::async_trait] impl RemoteStorage for LocalFs { async fn list_prefixes( &self, diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index e6c1e19ad5..ef22cec2a7 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -273,7 +273,6 @@ impl AsyncRead for RatelimitedAsyncRead { } } -#[async_trait::async_trait] impl RemoteStorage for S3Bucket { /// See the doc for `RemoteStorage::list_prefixes` /// Note: it wont include empty "directories" diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index cb40859831..8da23854d8 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -72,51 +72,50 @@ impl UnreliableWrapper { } } -#[async_trait::async_trait] -impl RemoteStorage for UnreliableWrapper { - async fn list_prefixes( - &self, - prefix: Option<&RemotePath>, - ) -> Result, DownloadError> { - self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))?; - self.inner.list_prefixes(prefix).await - } +// impl RemoteStorage for UnreliableWrapper { +// async fn list_prefixes( +// &self, +// prefix: Option<&RemotePath>, +// ) -> Result, DownloadError> { +// self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))?; +// self.inner.list_prefixes(prefix).await +// } - async fn upload( - &self, - data: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static, - // S3 PUT request requires the content length to be specified, - // otherwise it starts to fail with the concurrent connection count increasing. - data_size_bytes: usize, - to: &RemotePath, - metadata: Option, - ) -> anyhow::Result<()> { - self.attempt(RemoteOp::Upload(to.clone()))?; - self.inner.upload(data, data_size_bytes, to, metadata).await - } +// async fn upload( +// &self, +// data: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static, +// // S3 PUT request requires the content length to be specified, +// // otherwise it starts to fail with the concurrent connection count increasing. +// data_size_bytes: usize, +// to: &RemotePath, +// metadata: Option, +// ) -> anyhow::Result<()> { +// self.attempt(RemoteOp::Upload(to.clone()))?; +// self.inner.upload(data, data_size_bytes, to, metadata).await +// } - async fn download(&self, from: &RemotePath) -> Result { - self.attempt(RemoteOp::Download(from.clone()))?; - self.inner.download(from).await - } +// async fn download(&self, from: &RemotePath) -> Result { +// self.attempt(RemoteOp::Download(from.clone()))?; +// self.inner.download(from).await +// } - async fn download_byte_range( - &self, - from: &RemotePath, - start_inclusive: u64, - end_exclusive: Option, - ) -> Result { - // Note: We treat any download_byte_range as an "attempt" of the same - // operation. We don't pay attention to the ranges. That's good enough - // for now. - self.attempt(RemoteOp::Download(from.clone()))?; - self.inner - .download_byte_range(from, start_inclusive, end_exclusive) - .await - } +// async fn download_byte_range( +// &self, +// from: &RemotePath, +// start_inclusive: u64, +// end_exclusive: Option, +// ) -> Result { +// // Note: We treat any download_byte_range as an "attempt" of the same +// // operation. We don't pay attention to the ranges. That's good enough +// // for now. +// self.attempt(RemoteOp::Download(from.clone()))?; +// self.inner +// .download_byte_range(from, start_inclusive, end_exclusive) +// .await +// } - async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> { - self.attempt(RemoteOp::Delete(path.clone()))?; - self.inner.delete(path).await - } -} +// async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> { +// self.attempt(RemoteOp::Delete(path.clone()))?; +// self.inner.delete(path).await +// } +// }