Replace Box<dyn> with impl in RemoteStorage upload (#3984)

Replaces `Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>` with
`impl io::AsyncRead + Unpin + Send + Sync + 'static` usages in the
`RemoteStorage` interface, to make it closer to
[`#![feature(async_fn_in_trait)]`](https://blog.rust-lang.org/inside-rust/2022/11/17/async-fn-in-trait-nightly.html)

For `GenericRemoteStorage`, replaces `type Target = dyn RemoteStorage`
with another impl with `RemoteStorage` methods inside it.
We can reuse the trait, that would require importing the trait in every
file where it's used and makes us farther from the unstable feature.
After this PR, I've manged to create a patch with the changes:

https://github.com/neondatabase/neon/compare/kb/less-dyn-storage...kb/nightly-async-trait?expand=1

Current rust implementation does not like recursive async trait calls,
so `UnreliableWrapper` was removed: it contained a
`GenericRemoteStorage` that implemented the `RemoteStorage` trait, and
itself implemented the trait, which nightly rustc did not like and
proposed to box the future.
Similarly, `GenericRemoteStorage` cannot implement `RemoteStorage` for
nightly rustc to work, since calls various remote storages' methods from
inside.

I've compiled current `main` and the nightly branch both with `time env
RUSTC_WRAPPER="" cargo +nightly build --all --timings` command, and got
```
    Finished dev [optimized + debuginfo] target(s) in 2m 04s
env RUSTC_WRAPPER="" cargo +nightly build --all --timings  1283.19s user 50.40s system 1074% cpu 2:04.15 total

for the new feature tried and

    Finished dev [optimized + debuginfo] target(s) in 2m 40s
env RUSTC_WRAPPER="" cargo +nightly build --all --timings  1288.59s user 52.06s system 834% cpu 2:40.71 total

for the old async_trait approach.
```

On my machine, the `remote_storage` lib compilation takes ~10 less time
with the nightly feature (left) than the regular main (right).


![image](https://user-images.githubusercontent.com/2690773/230620797-163d8b89-dac8-4366-bcf6-cd1cdddcd22c.png)

Full cargo reports are available at
[timings.zip](https://github.com/neondatabase/neon/files/11179369/timings.zip)
This commit is contained in:
Kirill Bulatov
2023-04-07 21:39:49 +03:00
committed by GitHub
parent 0bf70e113f
commit dec58092e8
4 changed files with 65 additions and 13 deletions

View File

@@ -13,7 +13,6 @@ use std::{
collections::HashMap,
fmt::Debug,
num::{NonZeroU32, NonZeroUsize},
ops::Deref,
path::{Path, PathBuf},
pin::Pin,
sync::Arc,
@@ -90,7 +89,7 @@ pub trait RemoteStorage: Send + Sync + 'static {
/// Streams the local file contents into remote into the remote storage entry.
async fn upload(
&self,
data: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>,
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
// S3 PUT request requires the content length to be specified,
// otherwise it starts to fail with the concurrent connection count increasing.
data_size_bytes: usize,
@@ -161,14 +160,67 @@ pub enum GenericRemoteStorage {
Unreliable(Arc<UnreliableWrapper>),
}
impl Deref for GenericRemoteStorage {
type Target = dyn RemoteStorage;
fn deref(&self) -> &Self::Target {
impl GenericRemoteStorage {
pub async fn list_prefixes(
&self,
prefix: Option<&RemotePath>,
) -> Result<Vec<RemotePath>, DownloadError> {
match self {
GenericRemoteStorage::LocalFs(local_fs) => local_fs,
GenericRemoteStorage::AwsS3(s3_bucket) => s3_bucket.as_ref(),
GenericRemoteStorage::Unreliable(s) => s.as_ref(),
Self::LocalFs(s) => s.list_prefixes(prefix).await,
Self::AwsS3(s) => s.list_prefixes(prefix).await,
Self::Unreliable(s) => s.list_prefixes(prefix).await,
}
}
pub async fn upload(
&self,
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
) -> anyhow::Result<()> {
match self {
Self::LocalFs(s) => s.upload(from, data_size_bytes, to, metadata).await,
Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata).await,
Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata).await,
}
}
pub async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
match self {
Self::LocalFs(s) => s.download(from).await,
Self::AwsS3(s) => s.download(from).await,
Self::Unreliable(s) => s.download(from).await,
}
}
pub async fn download_byte_range(
&self,
from: &RemotePath,
start_inclusive: u64,
end_exclusive: Option<u64>,
) -> Result<Download, DownloadError> {
match self {
Self::LocalFs(s) => {
s.download_byte_range(from, start_inclusive, end_exclusive)
.await
}
Self::AwsS3(s) => {
s.download_byte_range(from, start_inclusive, end_exclusive)
.await
}
Self::Unreliable(s) => {
s.download_byte_range(from, start_inclusive, end_exclusive)
.await
}
}
}
pub async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
match self {
Self::LocalFs(s) => s.delete(path).await,
Self::AwsS3(s) => s.delete(path).await,
Self::Unreliable(s) => s.delete(path).await,
}
}
}
@@ -199,7 +251,7 @@ impl GenericRemoteStorage {
/// this path is used for the remote object id conversion only.
pub async fn upload_storage_object(
&self,
from: Box<dyn tokio::io::AsyncRead + Unpin + Send + Sync + 'static>,
from: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
from_size_bytes: usize,
to: &RemotePath,
) -> anyhow::Result<()> {

View File

@@ -118,7 +118,7 @@ impl RemoteStorage for LocalFs {
async fn upload(
&self,
data: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>,
data: impl io::AsyncRead + Unpin + Send + Sync + 'static,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,

View File

@@ -343,7 +343,7 @@ impl RemoteStorage for S3Bucket {
async fn upload(
&self,
from: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>,
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
from_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,

View File

@@ -84,7 +84,7 @@ impl RemoteStorage for UnreliableWrapper {
async fn upload(
&self,
data: Box<(dyn tokio::io::AsyncRead + Unpin + Send + Sync + 'static)>,
data: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
// S3 PUT request requires the content length to be specified,
// otherwise it starts to fail with the concurrent connection count increasing.
data_size_bytes: usize,