mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-18 13:40:37 +00:00
tests for short chunk reads + refactor for ValueReadState
This commit is contained in:
@@ -329,7 +329,7 @@ impl InMemoryLayer {
|
||||
// Process results into the reconstruct state
|
||||
'next_key: for (key, value_reads) in reads {
|
||||
for ValueRead { entry_lsn, read } in value_reads {
|
||||
match read.into_result() {
|
||||
match read.into_result().expect("we run execute() above") {
|
||||
Err(e) => {
|
||||
reconstruct_state.on_key_error(key, PageReconstructError::from(anyhow!(e)));
|
||||
continue 'next_key;
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
sync::{Arc, Mutex},
|
||||
sync::{Arc, RwLock},
|
||||
};
|
||||
|
||||
use itertools::Itertools;
|
||||
@@ -23,11 +23,17 @@ pub trait File: Send {
|
||||
}
|
||||
|
||||
/// A logical read that our user wants to do.
|
||||
#[derive(Debug)]
|
||||
pub struct ValueRead<B: Buffer> {
|
||||
pos: u32,
|
||||
// TODO: use tri-state enum to distinguish between not started, started, and finished-with-ok-or-err
|
||||
state: MutexRefCell<Result<B, Arc<std::io::Error>>>,
|
||||
state: RwLockRefCell<ValueReadState<B>>,
|
||||
}
|
||||
|
||||
enum ValueReadState<B: Buffer> {
|
||||
NotStarted(B),
|
||||
Ongoing(B),
|
||||
Ok(B),
|
||||
Error(Arc<std::io::Error>),
|
||||
Undefined,
|
||||
}
|
||||
|
||||
impl<B: Buffer> ValueRead<B> {
|
||||
@@ -35,21 +41,25 @@ impl<B: Buffer> ValueRead<B> {
|
||||
pub fn new(pos: u32, buf: B) -> Self {
|
||||
Self {
|
||||
pos,
|
||||
state: MutexRefCell::new(Ok(buf)),
|
||||
state: RwLockRefCell::new(ValueReadState::NotStarted(buf)),
|
||||
}
|
||||
}
|
||||
pub fn into_result(self) -> Result<B, Arc<std::io::Error>> {
|
||||
self.state.into_inner()
|
||||
pub fn into_result(self) -> Option<Result<B, Arc<std::io::Error>>> {
|
||||
match self.state.into_inner() {
|
||||
ValueReadState::Ok(buf) => Some(Ok(buf)),
|
||||
ValueReadState::Error(e) => Some(Err(e)),
|
||||
ValueReadState::NotStarted(_) | ValueReadState::Ongoing(_) => None,
|
||||
ValueReadState::Undefined => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The buffer into which a [`ValueRead`] result is placed.
|
||||
pub trait Buffer: sealed::Sealed + std::ops::Deref<Target = [u8]> {
|
||||
/// Immutable.
|
||||
fn cap(&self) -> usize;
|
||||
/// Changes only through [`Self::extend_from_slice`].
|
||||
fn len(&self) -> usize;
|
||||
fn remaining(&self) -> usize {
|
||||
self.cap().checked_sub(self.len()).unwrap()
|
||||
}
|
||||
/// Panics if the total length would exceed the initialized capacity.
|
||||
fn extend_from_slice(&mut self, src: &[u8]);
|
||||
}
|
||||
@@ -81,29 +91,44 @@ where
|
||||
F: File,
|
||||
B: Buffer + IoBufMut + Send,
|
||||
{
|
||||
// Preserve a copy of the value reads for debug assertions at the end
|
||||
#[cfg(debug_assertions)]
|
||||
let (reads, assert_value_reads) = {
|
||||
let (reads, assert) = reads.into_iter().tee();
|
||||
(reads, Some(Vec::from_iter(assert)))
|
||||
};
|
||||
#[cfg(not(debug_assertions))]
|
||||
let (reads, assert_value_reads) = (reads, None);
|
||||
|
||||
// Plan which parts of which chunks need to be appended to which buffer
|
||||
struct ChunkReadDestination<'a, B: Buffer> {
|
||||
value_read: &'a ValueRead<B>,
|
||||
offset_in_chunk: u32,
|
||||
len: u32,
|
||||
}
|
||||
// use of BTreeMap's sorted iterator is critical to ensure buffer is filled in order
|
||||
let mut chunk_reads: BTreeMap<u32, Vec<ChunkReadDestination<B>>> = BTreeMap::new();
|
||||
for value_read in reads {
|
||||
let ValueRead { pos, state } = value_read;
|
||||
let state = state.borrow();
|
||||
match state.as_ref() {
|
||||
Err(_) => panic!("The `ValueRead`s that are passed in must be freshly created using `ValueRead::new`"),
|
||||
Ok(buf) => {
|
||||
let mut state = state.borrow_mut();
|
||||
|
||||
// transition from NotStarted to Ongoing
|
||||
let cur = std::mem::replace(&mut *state, ValueReadState::Undefined);
|
||||
let remaining = match cur {
|
||||
ValueReadState::NotStarted(buf) => {
|
||||
if buf.len() != 0 {
|
||||
panic!("The `ValueRead`s that are passed in must be freshly created using `ValueRead::new`");
|
||||
}
|
||||
// buf.cap() == 0 is ok
|
||||
|
||||
// transition into Ongoing state
|
||||
let remaining = buf.cap();
|
||||
*state = ValueReadState::Ongoing(buf);
|
||||
remaining
|
||||
}
|
||||
}
|
||||
let remaining = state
|
||||
.as_ref()
|
||||
.expect("we haven't started reading, no chance it's in Err() state")
|
||||
.remaining();
|
||||
x => panic!("must only call with fresh ValueReads, got another state, leaving Undefined state behind state={x:?}"),
|
||||
};
|
||||
|
||||
// plan which chunks we need to read from
|
||||
let mut remaining = usize::try_from(remaining).unwrap();
|
||||
let mut chunk_no = *pos / (DIO_CHUNK_SIZE as u32);
|
||||
let mut offset_in_chunk = usize::try_from(*pos % (DIO_CHUNK_SIZE as u32)).unwrap();
|
||||
@@ -183,7 +208,7 @@ where
|
||||
|
||||
// Execute reads and fill the destination
|
||||
// TODO: prefetch
|
||||
let get_chunk_buf = |nchunks| Vec::with_capacity(nchunks as usize * (DIO_CHUNK_SIZE));
|
||||
let get_io_buffer = |nchunks| Vec::with_capacity(nchunks as usize * (DIO_CHUNK_SIZE));
|
||||
for MergedRead {
|
||||
start_chunk_no,
|
||||
nchunks,
|
||||
@@ -193,31 +218,33 @@ where
|
||||
let all_done = dsts
|
||||
.iter()
|
||||
.all(|MergedChunkReadDestination { value_read, .. }| {
|
||||
value_read.state.borrow().is_err()
|
||||
value_read.state.borrow().is_terminal()
|
||||
});
|
||||
if all_done {
|
||||
continue;
|
||||
}
|
||||
let read_offset = start_chunk_no * DIO_CHUNK_SIZE as u32;
|
||||
let io_buf = get_io_buffer(nchunks).slice_full();
|
||||
let req_len = io_buf.len();
|
||||
let (merged_read_buf_slice, nread) = match file
|
||||
.read_at_to_end(
|
||||
start_chunk_no * DIO_CHUNK_SIZE as u32,
|
||||
get_chunk_buf(nchunks).slice_full(),
|
||||
ctx,
|
||||
)
|
||||
.read_at_to_end(start_chunk_no * DIO_CHUNK_SIZE as u32, io_buf, ctx)
|
||||
.await
|
||||
{
|
||||
Ok(t) => t,
|
||||
Err(e) => {
|
||||
let e = Arc::new(e);
|
||||
for MergedChunkReadDestination { value_read, .. } in dsts {
|
||||
*value_read.state.borrow_mut() = Err(Arc::clone(&e));
|
||||
*value_read.state.borrow_mut() = ValueReadState::Error(Arc::clone(&e));
|
||||
// this will make later reads for the given ValueRead short-circuit, see top of loop body
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let merged_read_buf = merged_read_buf_slice.into_inner();
|
||||
assert_eq!(nread, merged_read_buf.len());
|
||||
assert!(
|
||||
nread <= merged_read_buf.len(),
|
||||
"the last chunk in the file can be a short read, so, no =="
|
||||
);
|
||||
let merged_read_buf = &merged_read_buf[..nread];
|
||||
for MergedChunkReadDestination {
|
||||
value_read,
|
||||
@@ -225,27 +252,138 @@ where
|
||||
len,
|
||||
} in dsts
|
||||
{
|
||||
if let Ok(buf) = &mut *value_read.state.borrow_mut() {
|
||||
let data = &merged_read_buf
|
||||
[offset_in_merged_read as usize..(offset_in_merged_read + len) as usize];
|
||||
assert!(buf.remaining() >= data.len());
|
||||
buf.extend_from_slice(data);
|
||||
let mut value_read_state_borrow = value_read.state.borrow_mut();
|
||||
let value_read_buf = match &mut *value_read_state_borrow {
|
||||
ValueReadState::NotStarted(_) => {
|
||||
unreachable!("we transition it into Ongoing at function entry")
|
||||
}
|
||||
ValueReadState::Ongoing(buf) => buf,
|
||||
ValueReadState::Ok(_) | ValueReadState::Error(_) => {
|
||||
debug_assert!(value_read_state_borrow.is_terminal());
|
||||
continue;
|
||||
}
|
||||
ValueReadState::Undefined => unreachable!(),
|
||||
};
|
||||
struct Range {
|
||||
start: usize, // inclusive
|
||||
end: usize, // exclusive
|
||||
}
|
||||
let range_in_merged_read_buf = Range {
|
||||
start: offset_in_merged_read as usize,
|
||||
end: offset_in_merged_read as usize + len as usize,
|
||||
};
|
||||
assert!(range_in_merged_read_buf.end >= range_in_merged_read_buf.start);
|
||||
if range_in_merged_read_buf.end > nread {
|
||||
let msg = format!(
|
||||
"merged chunk read returned EOF where this value read expected more data in the file: offset=0x{read_offset:x} req_len=0x{req_len:x} nread=0x{nread:x} {:?}",
|
||||
&*value_read_state_borrow
|
||||
);
|
||||
value_read_state_borrow.transition_to_terminal(Err(std::io::Error::new(
|
||||
std::io::ErrorKind::UnexpectedEof,
|
||||
msg,
|
||||
)));
|
||||
continue;
|
||||
}
|
||||
let data =
|
||||
&merged_read_buf[range_in_merged_read_buf.start..range_in_merged_read_buf.end];
|
||||
|
||||
// Copy data from io buffer into the value read buffer.
|
||||
// (And in debug mode, validate that the buffer impl adheres to the Buffer trait spec.)
|
||||
let pre = if cfg!(debug_assertions) {
|
||||
Some((value_read_buf.len(), value_read_buf.cap()))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
value_read_buf.extend_from_slice(data);
|
||||
let post = if cfg!(debug_assertions) {
|
||||
Some((value_read_buf.len(), value_read_buf.cap()))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
match (pre, post) {
|
||||
(None, None) => {}
|
||||
(Some(_), None) | (None, Some(_)) => unreachable!(),
|
||||
(Some((pre_len, pre_cap)), Some((post_len, post_cap))) => {
|
||||
assert_eq!(pre_len + len as usize, post_len);
|
||||
assert_eq!(pre_cap, post_cap);
|
||||
}
|
||||
}
|
||||
|
||||
if value_read_buf.len() == value_read_buf.cap() {
|
||||
value_read_state_borrow.transition_to_terminal(Ok(()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(assert_value_reads) = assert_value_reads {
|
||||
for value_read in assert_value_reads {
|
||||
assert!(value_read.state.borrow().is_terminal());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<B: Buffer> ValueReadState<B> {
|
||||
fn is_terminal(&self) -> bool {
|
||||
match self {
|
||||
ValueReadState::NotStarted(_) | ValueReadState::Ongoing(_) => false,
|
||||
ValueReadState::Ok(_) | ValueReadState::Error(_) => true,
|
||||
ValueReadState::Undefined => unreachable!(),
|
||||
}
|
||||
}
|
||||
fn transition_to_terminal(&mut self, err: std::io::Result<()>) {
|
||||
let cur = std::mem::replace(self, ValueReadState::Undefined);
|
||||
let buf = match cur {
|
||||
ValueReadState::Ongoing(buf) => buf,
|
||||
x => panic!("must only call in state Ongoing, got {x:?}"),
|
||||
};
|
||||
*self = match err {
|
||||
Ok(()) => ValueReadState::Ok(buf),
|
||||
Err(e) => ValueReadState::Error(Arc::new(e)),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
impl<B: Buffer> std::fmt::Debug for ValueReadState<B> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
#[derive(Debug)]
|
||||
#[allow(unused)]
|
||||
struct BufferDebug {
|
||||
len: usize,
|
||||
cap: usize,
|
||||
}
|
||||
impl<'a> From<&'a dyn Buffer> for BufferDebug {
|
||||
fn from(buf: &'a dyn Buffer) -> Self {
|
||||
Self {
|
||||
len: buf.len(),
|
||||
cap: buf.cap(),
|
||||
}
|
||||
}
|
||||
}
|
||||
match self {
|
||||
ValueReadState::NotStarted(b) => {
|
||||
write!(f, "NotStarted({:?})", BufferDebug::from(b as &dyn Buffer))
|
||||
}
|
||||
ValueReadState::Ongoing(b) => {
|
||||
write!(f, "Ongoing({:?})", BufferDebug::from(b as &dyn Buffer))
|
||||
}
|
||||
ValueReadState::Ok(b) => write!(f, "Ok({:?})", BufferDebug::from(b as &dyn Buffer)),
|
||||
ValueReadState::Error(e) => write!(f, "Error({:?})", e),
|
||||
ValueReadState::Undefined => write!(f, "Undefined"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct MutexRefCell<T>(Mutex<T>);
|
||||
impl<T> MutexRefCell<T> {
|
||||
struct RwLockRefCell<T>(RwLock<T>);
|
||||
impl<T> RwLockRefCell<T> {
|
||||
fn new(value: T) -> Self {
|
||||
Self(Mutex::new(value))
|
||||
Self(RwLock::new(value))
|
||||
}
|
||||
fn borrow(&self) -> impl std::ops::Deref<Target = T> + '_ {
|
||||
self.0.lock().unwrap()
|
||||
self.0.try_read().unwrap()
|
||||
}
|
||||
fn borrow_mut(&self) -> impl std::ops::DerefMut<Target = T> + '_ {
|
||||
self.0.lock().unwrap()
|
||||
self.0.try_write().unwrap()
|
||||
}
|
||||
fn into_inner(self) -> T {
|
||||
self.0.into_inner().unwrap()
|
||||
@@ -296,12 +434,38 @@ mod tests {
|
||||
}
|
||||
}
|
||||
fn test_value_read(&self, pos: u32, len: usize) -> TestValueRead {
|
||||
// InMemoryFile reads are infallible
|
||||
let expected_result = Ok(self.content[pos as usize..pos as usize + len].to_vec());
|
||||
let expected_result = if pos as usize + len > self.content.len() {
|
||||
Err(format!("InMemoryFile short read"))
|
||||
} else {
|
||||
Ok(self.content[pos as usize..pos as usize + len].to_vec())
|
||||
};
|
||||
TestValueRead::new(pos, len, expected_result)
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_in_memory_file() {
|
||||
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
|
||||
let file = InMemoryFile::new_random(10);
|
||||
let test_read = |pos, len| {
|
||||
let buf = vec![0; len];
|
||||
let fut = file.read_at_to_end(pos, buf.slice_full(), &ctx);
|
||||
use futures::FutureExt;
|
||||
let (slice, nread) = fut
|
||||
.now_or_never()
|
||||
.expect("impl never awaits")
|
||||
.expect("impl never errors");
|
||||
let mut buf = slice.into_inner();
|
||||
buf.truncate(nread);
|
||||
buf
|
||||
};
|
||||
assert_eq!(test_read(0, 1), &file.content[0..1]);
|
||||
assert_eq!(test_read(1, 2), &file.content[1..3]);
|
||||
assert_eq!(test_read(9, 2), &file.content[9..]);
|
||||
assert!(test_read(10, 2).is_empty());
|
||||
assert!(test_read(11, 2).is_empty());
|
||||
}
|
||||
|
||||
impl File for InMemoryFile {
|
||||
async fn read_at_to_end<'a, 'b, B: IoBufMut + Send>(
|
||||
&'b self,
|
||||
@@ -309,14 +473,20 @@ mod tests {
|
||||
mut dst: Slice<B>,
|
||||
_ctx: &'a RequestContext,
|
||||
) -> std::io::Result<(Slice<B>, usize)> {
|
||||
let len = std::cmp::min(
|
||||
dst.bytes_total(),
|
||||
self.content.len().saturating_sub(start as usize),
|
||||
);
|
||||
let dst_slice: &mut [u8] = dst.as_mut_rust_slice_full_zeroed();
|
||||
dst_slice[..len].copy_from_slice(&self.content[start as usize..start as usize + len]);
|
||||
rand::Rng::fill(&mut rand::thread_rng(), &mut dst_slice[len..]); // to discover bugs
|
||||
Ok((dst, len))
|
||||
let nread = {
|
||||
let req_len = dst_slice.len();
|
||||
let len = std::cmp::min(req_len, self.content.len().saturating_sub(start as usize));
|
||||
if start as usize >= self.content.len() {
|
||||
0
|
||||
} else {
|
||||
dst_slice[..len]
|
||||
.copy_from_slice(&self.content[start as usize..start as usize + len]);
|
||||
len
|
||||
}
|
||||
};
|
||||
rand::Rng::fill(&mut rand::thread_rng(), &mut dst_slice[nread..]); // to discover bugs
|
||||
Ok((dst, nread))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -352,7 +522,7 @@ mod tests {
|
||||
let value_reads = tmp.map(|tr| tr.make_value_read()).collect::<Vec<_>>();
|
||||
execute(file, value_reads.iter(), &ctx).await;
|
||||
for (value_read, test_value_read) in value_reads.into_iter().zip(test_value_reads) {
|
||||
let actual = value_read.into_result();
|
||||
let actual = value_read.into_result().expect("we call execute()");
|
||||
match (actual, test_value_read.expected_result) {
|
||||
(Ok(actual), Ok(expected)) if actual == expected => {}
|
||||
(Err(actual), Err(expected)) => {
|
||||
@@ -641,5 +811,78 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: short reads at end
|
||||
struct TestShortReadsSetup {
|
||||
ctx: RequestContext,
|
||||
file: InMemoryFile,
|
||||
written: u32,
|
||||
}
|
||||
fn setup_short_chunk_read_tests() -> TestShortReadsSetup {
|
||||
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
|
||||
assert!(DIO_CHUNK_SIZE > 20, "test assumption");
|
||||
let written = (2 * DIO_CHUNK_SIZE - 10) as u32;
|
||||
let file = InMemoryFile::new_random(written as usize);
|
||||
TestShortReadsSetup { ctx, file, written }
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_short_chunk_read_from_written_range() {
|
||||
// Test what happens if there are value reads
|
||||
// that start within the last chunk, and
|
||||
// the last chunk is not the full chunk length.
|
||||
//
|
||||
// The read should succeed despite the short chunk length.
|
||||
let TestShortReadsSetup { ctx, file, written } = setup_short_chunk_read_tests();
|
||||
|
||||
let a = file.test_value_read(written - 10, 5);
|
||||
let recorder = RecorderFile::new(&file);
|
||||
|
||||
execute_and_validate_test_value_reads(&recorder, vec![a], &ctx).await;
|
||||
|
||||
let recorded = recorder.recorded.borrow();
|
||||
assert_eq!(recorded.len(), 1);
|
||||
let RecordedRead { pos, req_len, res } = &recorded[0];
|
||||
assert_eq!(*pos, DIO_CHUNK_SIZE as u32);
|
||||
assert_eq!(*req_len, DIO_CHUNK_SIZE);
|
||||
assert_eq!(res, &file.content[DIO_CHUNK_SIZE..(written as usize)]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_short_chunk_read_and_value_read_from_unwritten_range() {
|
||||
// Test what happens if there are value reads
|
||||
// that start within the last chunk, and
|
||||
// the last chunk is not the full chunk length, and
|
||||
// the value reads end in the unwritten range.
|
||||
//
|
||||
// All should fail with UnexpectedEof and have the same IO pattern.
|
||||
async fn the_impl(offset_delta: i32) {
|
||||
let TestShortReadsSetup { ctx, file, written } = setup_short_chunk_read_tests();
|
||||
|
||||
let offset = (written as i32 + offset_delta) as u32;
|
||||
let a = file.test_value_read(offset, 5);
|
||||
let recorder = RecorderFile::new(&file);
|
||||
let a_vr = a.make_value_read();
|
||||
execute(&recorder, vec![&a_vr], &ctx).await;
|
||||
|
||||
// validate the ValueRead result
|
||||
let a_res = a_vr.into_result().unwrap();
|
||||
let a_err = a_res.unwrap_err();
|
||||
assert_eq!(a_err.kind(), std::io::ErrorKind::UnexpectedEof);
|
||||
|
||||
// validate the IO pattern
|
||||
let recorded = recorder.recorded.borrow();
|
||||
assert_eq!(recorded.len(), 1);
|
||||
let RecordedRead { pos, req_len, res } = &recorded[0];
|
||||
assert_eq!(*pos, DIO_CHUNK_SIZE as u32);
|
||||
assert_eq!(*req_len, DIO_CHUNK_SIZE);
|
||||
assert_eq!(res, &file.content[DIO_CHUNK_SIZE..(written as usize)]);
|
||||
}
|
||||
|
||||
the_impl(-1).await; // start == length - 1
|
||||
the_impl(0).await; // start == length
|
||||
the_impl(1).await; // start == length + 1
|
||||
}
|
||||
|
||||
// TODO: mixed: some valid, some UnexpectedEof
|
||||
|
||||
// TODO: same tests but with merges
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user