Use a few hacks to use the nightly async trait

This commit is contained in:
Kirill Bulatov
2023-04-07 16:35:54 +03:00
parent dec58092e8
commit b2d78dc07a
4 changed files with 56 additions and 57 deletions

View File

@@ -1,3 +1,5 @@
#![feature(async_fn_in_trait)]
//! A set of generic storage abstractions for the page server to use when backing up and restoring its state from the external storage.
//! No other modules from this tree are supposed to be used directly by the external code.
//!
@@ -75,7 +77,6 @@ impl RemotePath {
/// Storage (potentially remote) API to manage its state.
/// This storage tries to be unaware of any layered repository context,
/// providing basic CRUD operations for storage files.
#[async_trait::async_trait]
pub trait RemoteStorage: Send + Sync + 'static {
/// Lists all top level subdirectories for a given prefix
/// Note: here we assume that if the prefix is passed it was obtained via remote_object_id
@@ -157,7 +158,7 @@ impl std::error::Error for DownloadError {}
pub enum GenericRemoteStorage {
LocalFs(LocalFs),
AwsS3(Arc<S3Bucket>),
Unreliable(Arc<UnreliableWrapper>),
// Unreliable(Arc<UnreliableWrapper>),
}
impl GenericRemoteStorage {
@@ -168,7 +169,7 @@ impl GenericRemoteStorage {
match self {
Self::LocalFs(s) => s.list_prefixes(prefix).await,
Self::AwsS3(s) => s.list_prefixes(prefix).await,
Self::Unreliable(s) => s.list_prefixes(prefix).await,
// Self::Unreliable(s) => s.list_prefixes(prefix).await,
}
}
@@ -182,7 +183,7 @@ impl GenericRemoteStorage {
match self {
Self::LocalFs(s) => s.upload(from, data_size_bytes, to, metadata).await,
Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata).await,
Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata).await,
// Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata).await,
}
}
@@ -190,7 +191,7 @@ impl GenericRemoteStorage {
match self {
Self::LocalFs(s) => s.download(from).await,
Self::AwsS3(s) => s.download(from).await,
Self::Unreliable(s) => s.download(from).await,
// Self::Unreliable(s) => s.download(from).await,
}
}
@@ -209,10 +210,10 @@ impl GenericRemoteStorage {
s.download_byte_range(from, start_inclusive, end_exclusive)
.await
}
Self::Unreliable(s) => {
s.download_byte_range(from, start_inclusive, end_exclusive)
.await
}
// Self::Unreliable(s) => {
// s.download_byte_range(from, start_inclusive, end_exclusive)
// .await
// }
}
}
@@ -220,7 +221,7 @@ impl GenericRemoteStorage {
match self {
Self::LocalFs(s) => s.delete(path).await,
Self::AwsS3(s) => s.delete(path).await,
Self::Unreliable(s) => s.delete(path).await,
// Self::Unreliable(s) => s.delete(path).await,
}
}
}
@@ -241,7 +242,8 @@ impl GenericRemoteStorage {
}
pub fn unreliable_wrapper(s: Self, fail_first: u64) -> Self {
Self::Unreliable(Arc::new(UnreliableWrapper::new(s, fail_first)))
// Self::Unreliable(Arc::new(UnreliableWrapper::new(s, fail_first)))
unimplemented!("TODO kb")
}
/// Takes storage object contents and its size and uploads to remote storage,

View File

@@ -91,7 +91,6 @@ impl LocalFs {
}
}
#[async_trait::async_trait]
impl RemoteStorage for LocalFs {
async fn list_prefixes(
&self,

View File

@@ -273,7 +273,6 @@ impl<S: AsyncRead> AsyncRead for RatelimitedAsyncRead<S> {
}
}
#[async_trait::async_trait]
impl RemoteStorage for S3Bucket {
/// See the doc for `RemoteStorage::list_prefixes`
/// Note: it wont include empty "directories"

View File

@@ -72,51 +72,50 @@ impl UnreliableWrapper {
}
}
#[async_trait::async_trait]
impl RemoteStorage for UnreliableWrapper {
async fn list_prefixes(
&self,
prefix: Option<&RemotePath>,
) -> Result<Vec<RemotePath>, DownloadError> {
self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))?;
self.inner.list_prefixes(prefix).await
}
// impl RemoteStorage for UnreliableWrapper {
// async fn list_prefixes(
// &self,
// prefix: Option<&RemotePath>,
// ) -> Result<Vec<RemotePath>, DownloadError> {
// self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))?;
// self.inner.list_prefixes(prefix).await
// }
async fn upload(
&self,
data: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
// S3 PUT request requires the content length to be specified,
// otherwise it starts to fail with the concurrent connection count increasing.
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
) -> anyhow::Result<()> {
self.attempt(RemoteOp::Upload(to.clone()))?;
self.inner.upload(data, data_size_bytes, to, metadata).await
}
// async fn upload(
// &self,
// data: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
// // S3 PUT request requires the content length to be specified,
// // otherwise it starts to fail with the concurrent connection count increasing.
// data_size_bytes: usize,
// to: &RemotePath,
// metadata: Option<StorageMetadata>,
// ) -> anyhow::Result<()> {
// self.attempt(RemoteOp::Upload(to.clone()))?;
// self.inner.upload(data, data_size_bytes, to, metadata).await
// }
async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
self.attempt(RemoteOp::Download(from.clone()))?;
self.inner.download(from).await
}
// async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
// self.attempt(RemoteOp::Download(from.clone()))?;
// self.inner.download(from).await
// }
async fn download_byte_range(
&self,
from: &RemotePath,
start_inclusive: u64,
end_exclusive: Option<u64>,
) -> Result<Download, DownloadError> {
// Note: We treat any download_byte_range as an "attempt" of the same
// operation. We don't pay attention to the ranges. That's good enough
// for now.
self.attempt(RemoteOp::Download(from.clone()))?;
self.inner
.download_byte_range(from, start_inclusive, end_exclusive)
.await
}
// async fn download_byte_range(
// &self,
// from: &RemotePath,
// start_inclusive: u64,
// end_exclusive: Option<u64>,
// ) -> Result<Download, DownloadError> {
// // Note: We treat any download_byte_range as an "attempt" of the same
// // operation. We don't pay attention to the ranges. That's good enough
// // for now.
// self.attempt(RemoteOp::Download(from.clone()))?;
// self.inner
// .download_byte_range(from, start_inclusive, end_exclusive)
// .await
// }
async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
self.attempt(RemoteOp::Delete(path.clone()))?;
self.inner.delete(path).await
}
}
// async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
// self.attempt(RemoteOp::Delete(path.clone()))?;
// self.inner.delete(path).await
// }
// }