mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-26 23:59:58 +00:00
refactor(owned_buffer_io::util::size_tracking_writer): make generic over underlying writer (#7483)
part of https://github.com/neondatabase/neon/issues/7124
This commit is contained in:
committed by
GitHub
parent
70f4a16a05
commit
bf369f4268
@@ -183,6 +183,7 @@ async fn download_object<'a>(
|
|||||||
#[cfg(target_os = "linux")]
|
#[cfg(target_os = "linux")]
|
||||||
crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
|
crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
|
||||||
use crate::virtual_file::owned_buffers_io::{self, util::size_tracking_writer};
|
use crate::virtual_file::owned_buffers_io::{self, util::size_tracking_writer};
|
||||||
|
use bytes::BytesMut;
|
||||||
async {
|
async {
|
||||||
let destination_file = VirtualFile::create(dst_path)
|
let destination_file = VirtualFile::create(dst_path)
|
||||||
.await
|
.await
|
||||||
|
|||||||
@@ -32,6 +32,7 @@ pub use io_engine::feature_test as io_engine_feature_test;
|
|||||||
pub use io_engine::FeatureTestResult as IoEngineFeatureTestResult;
|
pub use io_engine::FeatureTestResult as IoEngineFeatureTestResult;
|
||||||
mod metadata;
|
mod metadata;
|
||||||
mod open_options;
|
mod open_options;
|
||||||
|
use self::owned_buffers_io::write::OwnedAsyncWriter;
|
||||||
pub(crate) use io_engine::IoEngineKind;
|
pub(crate) use io_engine::IoEngineKind;
|
||||||
pub(crate) use metadata::Metadata;
|
pub(crate) use metadata::Metadata;
|
||||||
pub(crate) use open_options::*;
|
pub(crate) use open_options::*;
|
||||||
@@ -1083,6 +1084,17 @@ impl Drop for VirtualFile {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl OwnedAsyncWriter for VirtualFile {
|
||||||
|
#[inline(always)]
|
||||||
|
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
|
||||||
|
&mut self,
|
||||||
|
buf: B,
|
||||||
|
) -> std::io::Result<(usize, B::Buf)> {
|
||||||
|
let (buf, res) = VirtualFile::write_all(self, buf).await;
|
||||||
|
res.map(move |v| (v, buf))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl OpenFiles {
|
impl OpenFiles {
|
||||||
fn new(num_slots: usize) -> OpenFiles {
|
fn new(num_slots: usize) -> OpenFiles {
|
||||||
let mut slots = Box::new(Vec::with_capacity(num_slots));
|
let mut slots = Box::new(Vec::with_capacity(num_slots));
|
||||||
|
|||||||
@@ -1,33 +1,36 @@
|
|||||||
use crate::virtual_file::{owned_buffers_io::write::OwnedAsyncWriter, VirtualFile};
|
use crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter;
|
||||||
use tokio_epoll_uring::{BoundedBuf, IoBuf};
|
use tokio_epoll_uring::{BoundedBuf, IoBuf};
|
||||||
|
|
||||||
pub struct Writer {
|
pub struct Writer<W> {
|
||||||
dst: VirtualFile,
|
dst: W,
|
||||||
bytes_amount: u64,
|
bytes_amount: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Writer {
|
impl<W> Writer<W> {
|
||||||
pub fn new(dst: VirtualFile) -> Self {
|
pub fn new(dst: W) -> Self {
|
||||||
Self {
|
Self {
|
||||||
dst,
|
dst,
|
||||||
bytes_amount: 0,
|
bytes_amount: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the wrapped `VirtualFile` object as well as the number
|
/// Returns the wrapped `VirtualFile` object as well as the number
|
||||||
/// of bytes that were written to it through this object.
|
/// of bytes that were written to it through this object.
|
||||||
pub fn into_inner(self) -> (u64, VirtualFile) {
|
pub fn into_inner(self) -> (u64, W) {
|
||||||
(self.bytes_amount, self.dst)
|
(self.bytes_amount, self.dst)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl OwnedAsyncWriter for Writer {
|
impl<W> OwnedAsyncWriter for Writer<W>
|
||||||
|
where
|
||||||
|
W: OwnedAsyncWriter,
|
||||||
|
{
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
|
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
|
||||||
&mut self,
|
&mut self,
|
||||||
buf: B,
|
buf: B,
|
||||||
) -> std::io::Result<(usize, B::Buf)> {
|
) -> std::io::Result<(usize, B::Buf)> {
|
||||||
let (buf, res) = self.dst.write_all(buf).await;
|
let (nwritten, buf) = self.dst.write_all(buf).await?;
|
||||||
let nwritten = res?;
|
|
||||||
self.bytes_amount += u64::try_from(nwritten).unwrap();
|
self.bytes_amount += u64::try_from(nwritten).unwrap();
|
||||||
Ok((nwritten, buf))
|
Ok((nwritten, buf))
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user