mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-26 23:59:58 +00:00
refactor(virtual_file) make write_all_at take owned buffers (#6673)
context: https://github.com/neondatabase/neon/issues/6663 Building atop #6664, this PR switches `write_all_at` to take owned buffers. The main challenge here is the `EphemeralFile::mutable_tail`, for which I'm picking the ugly solution of an `Option` that is `None` while the IO is in flight. After this, we will be able to switch `write_at` to take owned buffers and call tokio-epoll-uring's `write` function with that owned buffer. That'll be done in #6378.
This commit is contained in:
committed by
GitHub
parent
df5d588f63
commit
774a6e7475
@@ -6,6 +6,7 @@ use crate::context::RequestContext;
|
||||
use crate::page_cache::{self, PAGE_SZ};
|
||||
use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
use bytes::BytesMut;
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::cmp::min;
|
||||
@@ -26,7 +27,10 @@ pub struct EphemeralFile {
|
||||
/// An ephemeral file is append-only.
|
||||
/// We keep the last page, which can still be modified, in [`Self::mutable_tail`].
|
||||
/// The other pages, which can no longer be modified, are accessed through the page cache.
|
||||
mutable_tail: [u8; PAGE_SZ],
|
||||
///
|
||||
/// None <=> IO is ongoing.
|
||||
/// Size is fixed to PAGE_SZ at creation time and must not be changed.
|
||||
mutable_tail: Option<BytesMut>,
|
||||
}
|
||||
|
||||
impl EphemeralFile {
|
||||
@@ -60,7 +64,7 @@ impl EphemeralFile {
|
||||
_timeline_id: timeline_id,
|
||||
file,
|
||||
len: 0,
|
||||
mutable_tail: [0u8; PAGE_SZ],
|
||||
mutable_tail: Some(BytesMut::zeroed(PAGE_SZ)),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -103,7 +107,13 @@ impl EphemeralFile {
|
||||
};
|
||||
} else {
|
||||
debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
|
||||
Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
|
||||
Ok(BlockLease::EphemeralFileMutableTail(
|
||||
self.mutable_tail
|
||||
.as_deref()
|
||||
.expect("we're not doing IO, it must be Some()")
|
||||
.try_into()
|
||||
.expect("we ensure that it's always PAGE_SZ"),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,21 +145,27 @@ impl EphemeralFile {
|
||||
) -> Result<(), io::Error> {
|
||||
let mut src_remaining = src;
|
||||
while !src_remaining.is_empty() {
|
||||
let dst_remaining = &mut self.ephemeral_file.mutable_tail[self.off..];
|
||||
let dst_remaining = &mut self
|
||||
.ephemeral_file
|
||||
.mutable_tail
|
||||
.as_deref_mut()
|
||||
.expect("IO is not yet ongoing")[self.off..];
|
||||
let n = min(dst_remaining.len(), src_remaining.len());
|
||||
dst_remaining[..n].copy_from_slice(&src_remaining[..n]);
|
||||
self.off += n;
|
||||
src_remaining = &src_remaining[n..];
|
||||
if self.off == PAGE_SZ {
|
||||
match self
|
||||
let mutable_tail = std::mem::take(&mut self.ephemeral_file.mutable_tail)
|
||||
.expect("IO is not yet ongoing");
|
||||
let (mutable_tail, res) = self
|
||||
.ephemeral_file
|
||||
.file
|
||||
.write_all_at(
|
||||
&self.ephemeral_file.mutable_tail,
|
||||
self.blknum as u64 * PAGE_SZ as u64,
|
||||
)
|
||||
.await
|
||||
{
|
||||
.write_all_at(mutable_tail, self.blknum as u64 * PAGE_SZ as u64)
|
||||
.await;
|
||||
// TODO: If we panic before we can put the mutable_tail back, subsequent calls will fail.
|
||||
// I.e., the IO isn't retryable if we panic.
|
||||
self.ephemeral_file.mutable_tail = Some(mutable_tail);
|
||||
match res {
|
||||
Ok(_) => {
|
||||
// Pre-warm the page cache with what we just wrote.
|
||||
// This isn't necessary for coherency/correctness, but it's how we've always done it.
|
||||
@@ -169,7 +185,12 @@ impl EphemeralFile {
|
||||
Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => {
|
||||
let buf: &mut [u8] = write_guard.deref_mut();
|
||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||
buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
|
||||
buf.copy_from_slice(
|
||||
self.ephemeral_file
|
||||
.mutable_tail
|
||||
.as_deref()
|
||||
.expect("IO is not ongoing"),
|
||||
);
|
||||
let _ = write_guard.mark_valid();
|
||||
// pre-warm successful
|
||||
}
|
||||
@@ -181,7 +202,11 @@ impl EphemeralFile {
|
||||
// Zero the buffer for re-use.
|
||||
// Zeroing is critical for correcntess because the write_blob code below
|
||||
// and similarly read_blk expect zeroed pages.
|
||||
self.ephemeral_file.mutable_tail.fill(0);
|
||||
self.ephemeral_file
|
||||
.mutable_tail
|
||||
.as_deref_mut()
|
||||
.expect("IO is not ongoing")
|
||||
.fill(0);
|
||||
// This block is done, move to next one.
|
||||
self.blknum += 1;
|
||||
self.off = 0;
|
||||
|
||||
@@ -568,24 +568,37 @@ impl VirtualFile {
|
||||
}
|
||||
|
||||
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
|
||||
pub async fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> {
|
||||
pub async fn write_all_at<B: BoundedBuf>(
|
||||
&self,
|
||||
buf: B,
|
||||
mut offset: u64,
|
||||
) -> (B::Buf, Result<(), Error>) {
|
||||
let buf_len = buf.bytes_init();
|
||||
if buf_len == 0 {
|
||||
return (Slice::into_inner(buf.slice_full()), Ok(()));
|
||||
}
|
||||
let mut buf = buf.slice(0..buf_len);
|
||||
while !buf.is_empty() {
|
||||
match self.write_at(buf, offset).await {
|
||||
// TODO: push `buf` further down
|
||||
match self.write_at(&buf, offset).await {
|
||||
Ok(0) => {
|
||||
return Err(Error::new(
|
||||
return (
|
||||
Slice::into_inner(buf),
|
||||
Err(Error::new(
|
||||
std::io::ErrorKind::WriteZero,
|
||||
"failed to write whole buffer",
|
||||
));
|
||||
)),
|
||||
);
|
||||
}
|
||||
Ok(n) => {
|
||||
buf = &buf[n..];
|
||||
buf = buf.slice(n..);
|
||||
offset += n as u64;
|
||||
}
|
||||
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
|
||||
Err(e) => return Err(e),
|
||||
Err(e) => return (Slice::into_inner(buf), Err(e)),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
(Slice::into_inner(buf), Ok(()))
|
||||
}
|
||||
|
||||
/// Writes `buf.slice(0..buf.bytes_init())`.
|
||||
@@ -1050,10 +1063,19 @@ mod tests {
|
||||
MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf),
|
||||
}
|
||||
}
|
||||
async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
|
||||
async fn write_all_at<B: BoundedBuf>(&self, buf: B, offset: u64) -> Result<(), Error> {
|
||||
match self {
|
||||
MaybeVirtualFile::VirtualFile(file) => file.write_all_at(buf, offset).await,
|
||||
MaybeVirtualFile::File(file) => file.write_all_at(buf, offset),
|
||||
MaybeVirtualFile::VirtualFile(file) => {
|
||||
let (_buf, res) = file.write_all_at(buf, offset).await;
|
||||
res
|
||||
}
|
||||
MaybeVirtualFile::File(file) => {
|
||||
let buf_len = buf.bytes_init();
|
||||
if buf_len == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
file.write_all_at(&buf.slice(0..buf_len), offset)
|
||||
}
|
||||
}
|
||||
}
|
||||
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
|
||||
@@ -1200,8 +1222,8 @@ mod tests {
|
||||
.to_owned(),
|
||||
)
|
||||
.await?;
|
||||
file_b.write_all_at(b"BAR", 3).await?;
|
||||
file_b.write_all_at(b"FOO", 0).await?;
|
||||
file_b.write_all_at(b"BAR".to_vec(), 3).await?;
|
||||
file_b.write_all_at(b"FOO".to_vec(), 0).await?;
|
||||
|
||||
assert_eq!(file_b.read_string_at(2, 3).await?, "OBA");
|
||||
|
||||
|
||||
Reference in New Issue
Block a user