From dec58092e8b7c20e743a584f3e6fa8aaca73c988 Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Fri, 7 Apr 2023 21:39:49 +0300 Subject: [PATCH] Replace Box with impl in RemoteStorage upload (#3984) Replaces `Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>` with `impl io::AsyncRead + Unpin + Send + Sync + 'static` usages in the `RemoteStorage` interface, to make it closer to [`#![feature(async_fn_in_trait)]`](https://blog.rust-lang.org/inside-rust/2022/11/17/async-fn-in-trait-nightly.html) For `GenericRemoteStorage`, replaces `type Target = dyn RemoteStorage` with another impl with `RemoteStorage` methods inside it. We can reuse the trait, that would require importing the trait in every file where it's used and makes us farther from the unstable feature. After this PR, I've manged to create a patch with the changes: https://github.com/neondatabase/neon/compare/kb/less-dyn-storage...kb/nightly-async-trait?expand=1 Current rust implementation does not like recursive async trait calls, so `UnreliableWrapper` was removed: it contained a `GenericRemoteStorage` that implemented the `RemoteStorage` trait, and itself implemented the trait, which nightly rustc did not like and proposed to box the future. Similarly, `GenericRemoteStorage` cannot implement `RemoteStorage` for nightly rustc to work, since calls various remote storages' methods from inside. I've compiled current `main` and the nightly branch both with `time env RUSTC_WRAPPER="" cargo +nightly build --all --timings` command, and got ``` Finished dev [optimized + debuginfo] target(s) in 2m 04s env RUSTC_WRAPPER="" cargo +nightly build --all --timings 1283.19s user 50.40s system 1074% cpu 2:04.15 total for the new feature tried and Finished dev [optimized + debuginfo] target(s) in 2m 40s env RUSTC_WRAPPER="" cargo +nightly build --all --timings 1288.59s user 52.06s system 834% cpu 2:40.71 total for the old async_trait approach. ``` On my machine, the `remote_storage` lib compilation takes ~10 less time with the nightly feature (left) than the regular main (right). ![image](https://user-images.githubusercontent.com/2690773/230620797-163d8b89-dac8-4366-bcf6-cd1cdddcd22c.png) Full cargo reports are available at [timings.zip](https://github.com/neondatabase/neon/files/11179369/timings.zip) --- libs/remote_storage/src/lib.rs | 72 +++++++++++++++++--- libs/remote_storage/src/local_fs.rs | 2 +- libs/remote_storage/src/s3_bucket.rs | 2 +- libs/remote_storage/src/simulate_failures.rs | 2 +- 4 files changed, 65 insertions(+), 13 deletions(-) diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 5b74308514..e0cc3ca543 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -13,7 +13,6 @@ use std::{ collections::HashMap, fmt::Debug, num::{NonZeroU32, NonZeroUsize}, - ops::Deref, path::{Path, PathBuf}, pin::Pin, sync::Arc, @@ -90,7 +89,7 @@ pub trait RemoteStorage: Send + Sync + 'static { /// Streams the local file contents into remote into the remote storage entry. async fn upload( &self, - data: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>, + from: impl io::AsyncRead + Unpin + Send + Sync + 'static, // S3 PUT request requires the content length to be specified, // otherwise it starts to fail with the concurrent connection count increasing. data_size_bytes: usize, @@ -161,14 +160,67 @@ pub enum GenericRemoteStorage { Unreliable(Arc), } -impl Deref for GenericRemoteStorage { - type Target = dyn RemoteStorage; - - fn deref(&self) -> &Self::Target { +impl GenericRemoteStorage { + pub async fn list_prefixes( + &self, + prefix: Option<&RemotePath>, + ) -> Result, DownloadError> { match self { - GenericRemoteStorage::LocalFs(local_fs) => local_fs, - GenericRemoteStorage::AwsS3(s3_bucket) => s3_bucket.as_ref(), - GenericRemoteStorage::Unreliable(s) => s.as_ref(), + Self::LocalFs(s) => s.list_prefixes(prefix).await, + Self::AwsS3(s) => s.list_prefixes(prefix).await, + Self::Unreliable(s) => s.list_prefixes(prefix).await, + } + } + + pub async fn upload( + &self, + from: impl io::AsyncRead + Unpin + Send + Sync + 'static, + data_size_bytes: usize, + to: &RemotePath, + metadata: Option, + ) -> anyhow::Result<()> { + match self { + Self::LocalFs(s) => s.upload(from, data_size_bytes, to, metadata).await, + Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata).await, + Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata).await, + } + } + + pub async fn download(&self, from: &RemotePath) -> Result { + match self { + Self::LocalFs(s) => s.download(from).await, + Self::AwsS3(s) => s.download(from).await, + Self::Unreliable(s) => s.download(from).await, + } + } + + pub async fn download_byte_range( + &self, + from: &RemotePath, + start_inclusive: u64, + end_exclusive: Option, + ) -> Result { + match self { + Self::LocalFs(s) => { + s.download_byte_range(from, start_inclusive, end_exclusive) + .await + } + Self::AwsS3(s) => { + s.download_byte_range(from, start_inclusive, end_exclusive) + .await + } + Self::Unreliable(s) => { + s.download_byte_range(from, start_inclusive, end_exclusive) + .await + } + } + } + + pub async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> { + match self { + Self::LocalFs(s) => s.delete(path).await, + Self::AwsS3(s) => s.delete(path).await, + Self::Unreliable(s) => s.delete(path).await, } } } @@ -199,7 +251,7 @@ impl GenericRemoteStorage { /// this path is used for the remote object id conversion only. pub async fn upload_storage_object( &self, - from: Box, + from: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static, from_size_bytes: usize, to: &RemotePath, ) -> anyhow::Result<()> { diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 21a4156ad3..d7b46731cd 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -118,7 +118,7 @@ impl RemoteStorage for LocalFs { async fn upload( &self, - data: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>, + data: impl io::AsyncRead + Unpin + Send + Sync + 'static, data_size_bytes: usize, to: &RemotePath, metadata: Option, diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index fdf3ae02d3..e6c1e19ad5 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -343,7 +343,7 @@ impl RemoteStorage for S3Bucket { async fn upload( &self, - from: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>, + from: impl io::AsyncRead + Unpin + Send + Sync + 'static, from_size_bytes: usize, to: &RemotePath, metadata: Option, diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index d1d062f8e7..cb40859831 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -84,7 +84,7 @@ impl RemoteStorage for UnreliableWrapper { async fn upload( &self, - data: Box<(dyn tokio::io::AsyncRead + Unpin + Send + Sync + 'static)>, + data: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static, // S3 PUT request requires the content length to be specified, // otherwise it starts to fail with the concurrent connection count increasing. data_size_bytes: usize,