refactor(ephemeral_file): reuse owned_buffers_io::BufferedWriter (#7484)

part of https://github.com/neondatabase/neon/issues/7124

Changes
-------

This PR replaces the `EphemeralFile::write_blob`-specifc `struct Writer`
with re-use of `owned_buffers_io::write::BufferedWriter`.

Further, it restructures the code to cleanly separate

* the high-level aspect of EphemeralFile's write_blob / read_blk API
* the page-caching aspect
* the aspect of IO
  * performing buffered write IO to an underlying VirtualFile
* serving reads from either the VirtualFile or the buffer if it hasn't
been flushed yet
* the annoying "feature" that reads past the end of the written range
are allowed and expected to return zeroed memory, as long as one remains
within one PAGE_SZ
This commit is contained in:
Christian Schwarz
2024-04-26 13:01:26 +02:00
committed by GitHub
parent bf369f4268
commit dbb0c967d5
10 changed files with 538 additions and 208 deletions

View File

@@ -14,6 +14,14 @@ impl<W> Writer<W> {
}
}
pub fn bytes_written(&self) -> u64 {
self.bytes_amount
}
pub fn as_inner(&self) -> &W {
&self.dst
}
/// Returns the wrapped `VirtualFile` object as well as the number
/// of bytes that were written to it through this object.
pub fn into_inner(self) -> (u64, W) {

View File

@@ -47,6 +47,15 @@ where
}
}
pub fn as_inner(&self) -> &W {
&self.writer
}
/// Panics if used after any of the write paths returned an error
pub fn inspect_buffer(&self) -> &B {
self.buf()
}
pub async fn flush_and_into_inner(mut self) -> std::io::Result<W> {
self.flush().await?;
let Self { buf, writer } = self;
@@ -100,6 +109,28 @@ where
Ok((chunk_len, chunk.into_inner()))
}
/// Strictly less performant variant of [`Self::write_buffered`] that allows writing borrowed data.
///
/// It is less performant because we always have to copy the borrowed data into the internal buffer
/// before we can do the IO. The [`Self::write_buffered`] can avoid this, which is more performant
/// for large writes.
pub async fn write_buffered_borrowed(&mut self, mut chunk: &[u8]) -> std::io::Result<usize> {
let chunk_len = chunk.len();
while !chunk.is_empty() {
let buf = self.buf.as_mut().expect("must not use after an error");
let need = buf.cap() - buf.pending();
let have = chunk.len();
let n = std::cmp::min(need, have);
buf.extend_from_slice(&chunk[..n]);
chunk = &chunk[n..];
if buf.pending() >= buf.cap() {
assert_eq!(buf.pending(), buf.cap());
self.flush().await?;
}
}
Ok(chunk_len)
}
async fn flush(&mut self) -> std::io::Result<()> {
let buf = self.buf.take().expect("must not use after an error");
let buf_len = buf.pending();
@@ -266,4 +297,31 @@ mod tests {
);
Ok(())
}
#[tokio::test]
async fn test_write_all_borrowed_always_goes_through_buffer() -> std::io::Result<()> {
let recorder = RecorderWriter::default();
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
writer.write_buffered_borrowed(b"abc").await?;
writer.write_buffered_borrowed(b"d").await?;
writer.write_buffered_borrowed(b"e").await?;
writer.write_buffered_borrowed(b"fg").await?;
writer.write_buffered_borrowed(b"hi").await?;
writer.write_buffered_borrowed(b"j").await?;
writer.write_buffered_borrowed(b"klmno").await?;
let recorder = writer.flush_and_into_inner().await?;
assert_eq!(
recorder.writes,
{
let expect: &[&[u8]] = &[b"ab", b"cd", b"ef", b"gh", b"ij", b"kl", b"mn", b"o"];
expect
}
.iter()
.map(|v| v[..].to_vec())
.collect::<Vec<_>>()
);
Ok(())
}
}