mirror of
https://github.com/neondatabase/neon.git
synced 2026-07-05 13:10:37 +00:00
junk up owned_buffers_io from previous commit to deal with EphemeralFile speciality of reading zeroes past end-of-file
This makes it diverge semantically from what's in the tokio-epoll-uring download PR :(
This commit is contained in:
@@ -86,6 +86,7 @@ impl EphemeralFile {
|
||||
let buffered_offset = self.file.bytes_written();
|
||||
let flushed_offset = self.file.as_inner().as_inner().bytes_written();
|
||||
assert!(buffered_offset >= flushed_offset);
|
||||
let read_offset = (blknum as u64) * (PAGE_SZ as u64);
|
||||
|
||||
assert_eq!(
|
||||
flushed_offset % (PAGE_SZ as u64),
|
||||
@@ -93,13 +94,6 @@ impl EphemeralFile {
|
||||
"we need this in the logic below, because it assumes the page isn't spread across flushed part and in-memory buffer"
|
||||
);
|
||||
|
||||
let read_offset = (blknum as u64) * (PAGE_SZ as u64);
|
||||
if read_offset + (PAGE_SZ as u64) > buffered_offset {
|
||||
// TODO: handle out-of-bounds access, i.e., access past end of file currently panics
|
||||
// but should probably return an IO error. Pre-existing issue before this patch.
|
||||
todo!("return IO error: read past end of file")
|
||||
}
|
||||
|
||||
if read_offset < flushed_offset {
|
||||
assert!(
|
||||
read_offset + (PAGE_SZ as u64) <= flushed_offset,
|
||||
@@ -137,9 +131,22 @@ impl EphemeralFile {
|
||||
}
|
||||
};
|
||||
} else {
|
||||
let buffer: &[u8; Self::TAIL_SZ] = self.file.as_inner().inspect_buffer();
|
||||
let reads_past_end = read_offset + (PAGE_SZ as u64) <= buffered_offset;
|
||||
if reads_past_end {
|
||||
if cfg!(test) {
|
||||
// tests rely on being able to read zeroes from offsets [buffered_offset, next PAGE_SZ multiple of buffered_offset).
|
||||
// TODO: assert it's only up to the next PAGE_SZ
|
||||
} else {
|
||||
// TODO: treat this as error. Pre-existing issue before this patch.
|
||||
panic!(
|
||||
"return IO error: read past end of file: {read_offset:x} {buffered_offset:x}"
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
let buffer: &[u8; Self::TAIL_SZ] = self.file.as_inner().inspect_buffer();
|
||||
let read_offset_in_buffer = read_offset.checked_sub(flushed_offset).unwrap();
|
||||
|
||||
let read_offset_in_buffer = read_offset - buffered_offset;
|
||||
let read_offset_in_buffer = usize::try_from(read_offset_in_buffer).unwrap();
|
||||
let page = &buffer[read_offset_in_buffer..(read_offset_in_buffer + PAGE_SZ)];
|
||||
Ok(BlockLease::EphemeralFileMutableTail(
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use bytes::BytesMut;
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
|
||||
|
||||
/// A trait for doing owned-buffer write IO.
|
||||
@@ -33,9 +32,11 @@ pub struct BufferedWriter<const BUFFER_SIZE: usize, W> {
|
||||
// - while IO is ongoing => goes back to Some() once the IO completed successfully
|
||||
// - after an IO error => stays `None` forever
|
||||
// In these exceptional cases, it's `None`.
|
||||
buf: Option<BytesMut>,
|
||||
buf: Option<zero_initialized_buffer::Buf<BUFFER_SIZE>>,
|
||||
}
|
||||
|
||||
mod zero_initialized_buffer;
|
||||
|
||||
impl<const BUFFER_SIZE: usize, W> BufferedWriter<BUFFER_SIZE, W>
|
||||
where
|
||||
W: OwnedAsyncWriter,
|
||||
@@ -43,7 +44,7 @@ where
|
||||
pub fn new(writer: W) -> Self {
|
||||
Self {
|
||||
writer,
|
||||
buf: Some(BytesMut::with_capacity(BUFFER_SIZE)),
|
||||
buf: Some(zero_initialized_buffer::Buf::default()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,10 +55,10 @@ where
|
||||
/// panics if used after an error
|
||||
pub fn inspect_buffer(&self) -> &[u8; BUFFER_SIZE] {
|
||||
self.buf
|
||||
.as_deref()
|
||||
.expect("must not use after an error on the write path")
|
||||
.try_into()
|
||||
.unwrap()
|
||||
.as_ref()
|
||||
// TODO: can this happen on the EphemeralFile read path?
|
||||
.expect("must not use after an error")
|
||||
.as_zero_padded_slice()
|
||||
}
|
||||
|
||||
pub async fn flush_and_into_inner(mut self) -> std::io::Result<W> {
|
||||
|
||||
@@ -0,0 +1,69 @@
|
||||
use std::mem::MaybeUninit;
|
||||
|
||||
pub struct Buf<const N: usize> {
|
||||
allocation: Box<[u8; N]>,
|
||||
written: usize,
|
||||
}
|
||||
|
||||
impl<const N: usize> Default for Buf<N> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
allocation: Box::new(
|
||||
// SAFETY: zeroed memory is a valid [u8; N]
|
||||
unsafe { MaybeUninit::zeroed().assume_init() },
|
||||
),
|
||||
written: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<const N: usize> Buf<N> {
|
||||
#[inline(always)]
|
||||
fn invariants(&self) {
|
||||
debug_assert!(self.written <= N, "{}", self.written);
|
||||
}
|
||||
|
||||
pub fn as_zero_padded_slice(&self) -> &[u8; N] {
|
||||
&self.allocation
|
||||
}
|
||||
|
||||
/// panics if there's not enough capacity left
|
||||
pub fn extend_from_slice(&mut self, buf: &[u8]) {
|
||||
self.invariants();
|
||||
let can = N - self.written;
|
||||
let want = buf.len();
|
||||
assert!(want <= can, "{:x} {:x}", want, can);
|
||||
self.allocation[self.written..(self.written + want)].copy_from_slice(buf);
|
||||
self.written += want;
|
||||
self.invariants();
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.written
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.len() == 0
|
||||
}
|
||||
|
||||
pub fn clear(&mut self) {
|
||||
self.invariants();
|
||||
self.written = 0;
|
||||
self.allocation[..].fill(0);
|
||||
self.invariants();
|
||||
}
|
||||
}
|
||||
|
||||
unsafe impl<const N: usize> tokio_epoll_uring::IoBuf for Buf<N> {
|
||||
fn stable_ptr(&self) -> *const u8 {
|
||||
self.allocation.as_ptr()
|
||||
}
|
||||
|
||||
fn bytes_init(&self) -> usize {
|
||||
self.written
|
||||
}
|
||||
|
||||
fn bytes_total(&self) -> usize {
|
||||
self.written // ?
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user