From bf369f4268f839b5228dd1d65d822280d50401c8 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 26 Apr 2024 11:19:41 +0200 Subject: [PATCH] refactor(owned_buffer_io::util::size_tracking_writer): make generic over underlying writer (#7483) part of https://github.com/neondatabase/neon/issues/7124 --- .../tenant/remote_timeline_client/download.rs | 1 + pageserver/src/virtual_file.rs | 12 +++++++++++ .../util/size_tracking_writer.rs | 21 +++++++++++-------- 3 files changed, 25 insertions(+), 9 deletions(-) diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 7bf2d2de10..3744eecab5 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -183,6 +183,7 @@ async fn download_object<'a>( #[cfg(target_os = "linux")] crate::virtual_file::io_engine::IoEngine::TokioEpollUring => { use crate::virtual_file::owned_buffers_io::{self, util::size_tracking_writer}; + use bytes::BytesMut; async { let destination_file = VirtualFile::create(dst_path) .await diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 0cf6a0019b..1d43a94568 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -32,6 +32,7 @@ pub use io_engine::feature_test as io_engine_feature_test; pub use io_engine::FeatureTestResult as IoEngineFeatureTestResult; mod metadata; mod open_options; +use self::owned_buffers_io::write::OwnedAsyncWriter; pub(crate) use io_engine::IoEngineKind; pub(crate) use metadata::Metadata; pub(crate) use open_options::*; @@ -1083,6 +1084,17 @@ impl Drop for VirtualFile { } } +impl OwnedAsyncWriter for VirtualFile { + #[inline(always)] + async fn write_all, Buf: IoBuf + Send>( + &mut self, + buf: B, + ) -> std::io::Result<(usize, B::Buf)> { + let (buf, res) = VirtualFile::write_all(self, buf).await; + res.map(move |v| (v, buf)) + } +} + impl OpenFiles { fn new(num_slots: usize) -> OpenFiles { let mut slots = Box::new(Vec::with_capacity(num_slots)); diff --git a/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs b/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs index 7505b7487e..edb11c5f4c 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs @@ -1,33 +1,36 @@ -use crate::virtual_file::{owned_buffers_io::write::OwnedAsyncWriter, VirtualFile}; +use crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter; use tokio_epoll_uring::{BoundedBuf, IoBuf}; -pub struct Writer { - dst: VirtualFile, +pub struct Writer { + dst: W, bytes_amount: u64, } -impl Writer { - pub fn new(dst: VirtualFile) -> Self { +impl Writer { + pub fn new(dst: W) -> Self { Self { dst, bytes_amount: 0, } } + /// Returns the wrapped `VirtualFile` object as well as the number /// of bytes that were written to it through this object. - pub fn into_inner(self) -> (u64, VirtualFile) { + pub fn into_inner(self) -> (u64, W) { (self.bytes_amount, self.dst) } } -impl OwnedAsyncWriter for Writer { +impl OwnedAsyncWriter for Writer +where + W: OwnedAsyncWriter, +{ #[inline(always)] async fn write_all, Buf: IoBuf + Send>( &mut self, buf: B, ) -> std::io::Result<(usize, B::Buf)> { - let (buf, res) = self.dst.write_all(buf).await; - let nwritten = res?; + let (nwritten, buf) = self.dst.write_all(buf).await?; self.bytes_amount += u64::try_from(nwritten).unwrap(); Ok((nwritten, buf)) }