mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-18 21:50:37 +00:00
the great rename
This commit is contained in:
@@ -283,7 +283,7 @@ impl InMemoryLayer {
|
||||
|
||||
struct ValueRead {
|
||||
entry_lsn: Lsn,
|
||||
read: vectored_dio_read::ValueRead<Vec<u8>>,
|
||||
read: vectored_dio_read::LogicalRead<Vec<u8>>,
|
||||
}
|
||||
let mut reads: HashMap<Key, Vec<ValueRead>> = HashMap::new();
|
||||
|
||||
@@ -303,7 +303,7 @@ impl InMemoryLayer {
|
||||
for (entry_lsn, index_value) in slice.iter().rev() {
|
||||
reads.entry(key).or_default().push(ValueRead {
|
||||
entry_lsn: *entry_lsn,
|
||||
read: vectored_dio_read::ValueRead::new(
|
||||
read: vectored_dio_read::LogicalRead::new(
|
||||
index_value.pos,
|
||||
Vec::with_capacity(index_value.len as usize),
|
||||
),
|
||||
|
||||
@@ -22,13 +22,13 @@ pub trait File: Send {
|
||||
) -> std::io::Result<(Slice<B>, usize)>;
|
||||
}
|
||||
|
||||
/// A logical read that our user wants to do.
|
||||
pub struct ValueRead<B: Buffer> {
|
||||
/// A logical read from [`File`]. See [`Self::new`].
|
||||
pub struct LogicalRead<B: Buffer> {
|
||||
pos: u32,
|
||||
state: RwLockRefCell<ValueReadState<B>>,
|
||||
state: RwLockRefCell<LogicalReadState<B>>,
|
||||
}
|
||||
|
||||
enum ValueReadState<B: Buffer> {
|
||||
enum LogicalReadState<B: Buffer> {
|
||||
NotStarted(B),
|
||||
Ongoing(B),
|
||||
Ok(B),
|
||||
@@ -36,25 +36,25 @@ enum ValueReadState<B: Buffer> {
|
||||
Undefined,
|
||||
}
|
||||
|
||||
impl<B: Buffer> ValueRead<B> {
|
||||
/// Create a new [`ValueRead`] from [`File`] of the data in the file in range `[ pos, pos + buf.cap() )`.
|
||||
impl<B: Buffer> LogicalRead<B> {
|
||||
/// Create a new [`LogicalRead`] from [`File`] of the data in the file in range `[ pos, pos + buf.cap() )`.
|
||||
pub fn new(pos: u32, buf: B) -> Self {
|
||||
Self {
|
||||
pos,
|
||||
state: RwLockRefCell::new(ValueReadState::NotStarted(buf)),
|
||||
state: RwLockRefCell::new(LogicalReadState::NotStarted(buf)),
|
||||
}
|
||||
}
|
||||
pub fn into_result(self) -> Option<Result<B, Arc<std::io::Error>>> {
|
||||
match self.state.into_inner() {
|
||||
ValueReadState::Ok(buf) => Some(Ok(buf)),
|
||||
ValueReadState::Error(e) => Some(Err(e)),
|
||||
ValueReadState::NotStarted(_) | ValueReadState::Ongoing(_) => None,
|
||||
ValueReadState::Undefined => unreachable!(),
|
||||
LogicalReadState::Ok(buf) => Some(Ok(buf)),
|
||||
LogicalReadState::Error(e) => Some(Err(e)),
|
||||
LogicalReadState::NotStarted(_) | LogicalReadState::Ongoing(_) => None,
|
||||
LogicalReadState::Undefined => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The buffer into which a [`ValueRead`] result is placed.
|
||||
/// The buffer into which a [`LogicalRead`] result is placed.
|
||||
pub trait Buffer: sealed::Sealed + std::ops::Deref<Target = [u8]> {
|
||||
/// Immutable.
|
||||
fn cap(&self) -> usize;
|
||||
@@ -78,54 +78,53 @@ const MAX_CHUNK_BATCH_SIZE: usize = {
|
||||
desired / DIO_CHUNK_SIZE
|
||||
};
|
||||
|
||||
/// Execute the given `reads` against `file`.
|
||||
/// The results are placed in the buffers of the [`ValueRead`]s.
|
||||
/// Retrieve the results by calling [`ValueRead::into_result`] on each [`ValueRead`].
|
||||
/// Execute the given logical `reads` against `file`.
|
||||
/// The results are placed in the buffers of the [`LogicalRead`]s.
|
||||
/// Retrieve the results by calling [`LogicalRead::into_result`] on each [`LogicalRead`].
|
||||
///
|
||||
/// The [`ValueRead`]s must be freshly created using [`ValueRead::new`] when calling this function.
|
||||
/// Otherwise, it might panic or the value resad result will be undefined.
|
||||
/// TODO: prevent this through type system.
|
||||
/// The [`LogicalRead`]s must be freshly created using [`LogicalRead::new`] when calling this function.
|
||||
/// Otherwise, this function panics.
|
||||
pub async fn execute<'a, I, F, B>(file: &F, reads: I, ctx: &RequestContext)
|
||||
where
|
||||
I: IntoIterator<Item = &'a ValueRead<B>>,
|
||||
I: IntoIterator<Item = &'a LogicalRead<B>>,
|
||||
F: File,
|
||||
B: Buffer + IoBufMut + Send,
|
||||
{
|
||||
// Preserve a copy of the value reads for debug assertions at the end
|
||||
// Preserve a copy of the logical reads for debug assertions at the end
|
||||
#[cfg(debug_assertions)]
|
||||
let (reads, assert_value_reads) = {
|
||||
let (reads, assert_logical_reads) = {
|
||||
let (reads, assert) = reads.into_iter().tee();
|
||||
(reads, Some(Vec::from_iter(assert)))
|
||||
};
|
||||
#[cfg(not(debug_assertions))]
|
||||
let (reads, assert_value_reads) = (reads, None);
|
||||
let (reads, assert_logical_reads) = (reads, None);
|
||||
|
||||
// Plan which parts of which chunks need to be appended to which buffer
|
||||
struct ChunkReadDestination<'a, B: Buffer> {
|
||||
value_read: &'a ValueRead<B>,
|
||||
let mut by_chunk: BTreeMap<u32, Vec<Interest<B>>> = BTreeMap::new();
|
||||
struct Interest<'a, B: Buffer> {
|
||||
logical_read: &'a LogicalRead<B>,
|
||||
offset_in_chunk: u32,
|
||||
len: u32,
|
||||
}
|
||||
let mut chunk_reads: BTreeMap<u32, Vec<ChunkReadDestination<B>>> = BTreeMap::new();
|
||||
for value_read in reads {
|
||||
let ValueRead { pos, state } = value_read;
|
||||
for logical_read in reads {
|
||||
let LogicalRead { pos, state } = logical_read;
|
||||
let mut state = state.borrow_mut();
|
||||
|
||||
// transition from NotStarted to Ongoing
|
||||
let cur = std::mem::replace(&mut *state, ValueReadState::Undefined);
|
||||
let cur = std::mem::replace(&mut *state, LogicalReadState::Undefined);
|
||||
let remaining = match cur {
|
||||
ValueReadState::NotStarted(buf) => {
|
||||
LogicalReadState::NotStarted(buf) => {
|
||||
if buf.len() != 0 {
|
||||
panic!("The `ValueRead`s that are passed in must be freshly created using `ValueRead::new`");
|
||||
panic!("The `LogicalRead`s that are passed in must be freshly created using `LogicalRead::new`");
|
||||
}
|
||||
// buf.cap() == 0 is ok
|
||||
|
||||
// transition into Ongoing state
|
||||
let remaining = buf.cap();
|
||||
*state = ValueReadState::Ongoing(buf);
|
||||
*state = LogicalReadState::Ongoing(buf);
|
||||
remaining
|
||||
}
|
||||
x => panic!("must only call with fresh ValueReads, got another state, leaving Undefined state behind state={x:?}"),
|
||||
x => panic!("must only call with fresh LogicalReads, got another state, leaving Undefined state behind state={x:?}"),
|
||||
};
|
||||
|
||||
// plan which chunks we need to read from
|
||||
@@ -134,35 +133,36 @@ where
|
||||
let mut offset_in_chunk = usize::try_from(*pos % (DIO_CHUNK_SIZE as u32)).unwrap();
|
||||
while remaining > 0 {
|
||||
let remaining_in_chunk = std::cmp::min(remaining, DIO_CHUNK_SIZE - offset_in_chunk);
|
||||
chunk_reads
|
||||
.entry(chunk_no)
|
||||
.or_default()
|
||||
.push(ChunkReadDestination {
|
||||
value_read,
|
||||
offset_in_chunk: offset_in_chunk as u32,
|
||||
len: remaining_in_chunk as u32,
|
||||
});
|
||||
by_chunk.entry(chunk_no).or_default().push(Interest {
|
||||
logical_read,
|
||||
offset_in_chunk: offset_in_chunk as u32,
|
||||
len: remaining_in_chunk as u32,
|
||||
});
|
||||
offset_in_chunk = 0;
|
||||
chunk_no += 1;
|
||||
remaining -= remaining_in_chunk;
|
||||
}
|
||||
}
|
||||
|
||||
struct MergedRead<'a, B: Buffer> {
|
||||
// At this point, we could iterate over by_chunk, in chunk order,
|
||||
// read each chunk from disk, and fill the buffers.
|
||||
// However, we can merge adjacent chunks into batches of MAX_CHUNK_BATCH_SIZE
|
||||
// so we issue fewer IOs = fewer roundtrips = lower overall latency.
|
||||
struct PhysicalRead<'a, B: Buffer> {
|
||||
start_chunk_no: u32,
|
||||
nchunks: u32,
|
||||
dsts: Vec<MergedChunkReadDestination<'a, B>>,
|
||||
dsts: Vec<MergedInterest<'a, B>>,
|
||||
}
|
||||
struct MergedChunkReadDestination<'a, B: Buffer> {
|
||||
value_read: &'a ValueRead<B>,
|
||||
offset_in_merged_read: u32,
|
||||
struct MergedInterest<'a, B: Buffer> {
|
||||
logical_read: &'a LogicalRead<B>,
|
||||
offset_in_physical_read: u32,
|
||||
len: u32,
|
||||
}
|
||||
let mut merged_reads: Vec<MergedRead<B>> = Vec::new();
|
||||
let mut chunk_reads = chunk_reads.into_iter().peekable();
|
||||
let mut physical_reads: Vec<PhysicalRead<B>> = Vec::new();
|
||||
let mut by_chunk = by_chunk.into_iter().peekable();
|
||||
loop {
|
||||
let mut last_chunk_no = None;
|
||||
let to_merge: Vec<(u32, Vec<ChunkReadDestination<B>>)> = chunk_reads
|
||||
let to_merge: Vec<(u32, Vec<Interest<B>>)> = by_chunk
|
||||
.peeking_take_while(|(chunk_no, _)| {
|
||||
if let Some(last_chunk_no) = last_chunk_no {
|
||||
if *chunk_no != last_chunk_no + 1 {
|
||||
@@ -183,14 +183,14 @@ where
|
||||
.enumerate()
|
||||
.flat_map(|(i, (_, dsts))| {
|
||||
dsts.into_iter().map(
|
||||
move |ChunkReadDestination {
|
||||
value_read,
|
||||
move |Interest {
|
||||
logical_read,
|
||||
offset_in_chunk,
|
||||
len,
|
||||
}| {
|
||||
MergedChunkReadDestination {
|
||||
value_read,
|
||||
offset_in_merged_read: i as u32 * DIO_CHUNK_SIZE as u32
|
||||
MergedInterest {
|
||||
logical_read,
|
||||
offset_in_physical_read: i as u32 * DIO_CHUNK_SIZE as u32
|
||||
+ offset_in_chunk,
|
||||
len,
|
||||
}
|
||||
@@ -198,105 +198,102 @@ where
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
merged_reads.push(MergedRead {
|
||||
physical_reads.push(PhysicalRead {
|
||||
start_chunk_no,
|
||||
nchunks,
|
||||
dsts,
|
||||
});
|
||||
}
|
||||
drop(chunk_reads);
|
||||
drop(by_chunk);
|
||||
|
||||
// Execute reads and fill the destination
|
||||
// Execute physical reads and fill the logical read buffers
|
||||
// TODO: prefetch
|
||||
let get_io_buffer = |nchunks| Vec::with_capacity(nchunks as usize * (DIO_CHUNK_SIZE));
|
||||
for MergedRead {
|
||||
for PhysicalRead {
|
||||
start_chunk_no,
|
||||
nchunks,
|
||||
dsts,
|
||||
} in merged_reads
|
||||
} in physical_reads
|
||||
{
|
||||
let all_done = dsts
|
||||
.iter()
|
||||
.all(|MergedChunkReadDestination { value_read, .. }| {
|
||||
value_read.state.borrow().is_terminal()
|
||||
});
|
||||
.all(|MergedInterest { logical_read, .. }| logical_read.state.borrow().is_terminal());
|
||||
if all_done {
|
||||
continue;
|
||||
}
|
||||
let read_offset = start_chunk_no * DIO_CHUNK_SIZE as u32;
|
||||
let io_buf = get_io_buffer(nchunks).slice_full();
|
||||
let req_len = io_buf.len();
|
||||
let (merged_read_buf_slice, nread) = match file
|
||||
let (io_buf_slice, nread) = match file
|
||||
.read_at_to_end(start_chunk_no * DIO_CHUNK_SIZE as u32, io_buf, ctx)
|
||||
.await
|
||||
{
|
||||
Ok(t) => t,
|
||||
Err(e) => {
|
||||
let e = Arc::new(e);
|
||||
for MergedChunkReadDestination { value_read, .. } in dsts {
|
||||
*value_read.state.borrow_mut() = ValueReadState::Error(Arc::clone(&e));
|
||||
// this will make later reads for the given ValueRead short-circuit, see top of loop body
|
||||
for MergedInterest { logical_read, .. } in dsts {
|
||||
*logical_read.state.borrow_mut() = LogicalReadState::Error(Arc::clone(&e));
|
||||
// this will make later reads for the given LogicalRead short-circuit, see top of loop body
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let merged_read_buf = merged_read_buf_slice.into_inner();
|
||||
let io_buf = io_buf_slice.into_inner();
|
||||
assert!(
|
||||
nread <= merged_read_buf.len(),
|
||||
nread <= io_buf.len(),
|
||||
"the last chunk in the file can be a short read, so, no =="
|
||||
);
|
||||
let merged_read_buf = &merged_read_buf[..nread];
|
||||
for MergedChunkReadDestination {
|
||||
value_read,
|
||||
offset_in_merged_read,
|
||||
let io_buf = &io_buf[..nread];
|
||||
for MergedInterest {
|
||||
logical_read,
|
||||
offset_in_physical_read,
|
||||
len,
|
||||
} in dsts
|
||||
{
|
||||
let mut value_read_state_borrow = value_read.state.borrow_mut();
|
||||
let value_read_buf = match &mut *value_read_state_borrow {
|
||||
ValueReadState::NotStarted(_) => {
|
||||
let mut logical_read_state_borrow = logical_read.state.borrow_mut();
|
||||
let logical_read_buf = match &mut *logical_read_state_borrow {
|
||||
LogicalReadState::NotStarted(_) => {
|
||||
unreachable!("we transition it into Ongoing at function entry")
|
||||
}
|
||||
ValueReadState::Ongoing(buf) => buf,
|
||||
ValueReadState::Ok(_) | ValueReadState::Error(_) => {
|
||||
debug_assert!(value_read_state_borrow.is_terminal());
|
||||
LogicalReadState::Ongoing(buf) => buf,
|
||||
LogicalReadState::Ok(_) | LogicalReadState::Error(_) => {
|
||||
debug_assert!(logical_read_state_borrow.is_terminal());
|
||||
continue;
|
||||
}
|
||||
ValueReadState::Undefined => unreachable!(),
|
||||
LogicalReadState::Undefined => unreachable!(),
|
||||
};
|
||||
struct Range {
|
||||
start: usize, // inclusive
|
||||
end: usize, // exclusive
|
||||
}
|
||||
let range_in_merged_read_buf = Range {
|
||||
start: offset_in_merged_read as usize,
|
||||
end: offset_in_merged_read as usize + len as usize,
|
||||
let range_in_io_buf = Range {
|
||||
start: offset_in_physical_read as usize,
|
||||
end: offset_in_physical_read as usize + len as usize,
|
||||
};
|
||||
assert!(range_in_merged_read_buf.end >= range_in_merged_read_buf.start);
|
||||
if range_in_merged_read_buf.end > nread {
|
||||
assert!(range_in_io_buf.end >= range_in_io_buf.start);
|
||||
if range_in_io_buf.end > nread {
|
||||
let msg = format!(
|
||||
"merged chunk read returned EOF where this value read expected more data in the file: offset=0x{read_offset:x} req_len=0x{req_len:x} nread=0x{nread:x} {:?}",
|
||||
&*value_read_state_borrow
|
||||
"physical read returned EOF where this logical read expected more data in the file: offset=0x{read_offset:x} req_len=0x{req_len:x} nread=0x{nread:x} {:?}",
|
||||
&*logical_read_state_borrow
|
||||
);
|
||||
value_read_state_borrow.transition_to_terminal(Err(std::io::Error::new(
|
||||
logical_read_state_borrow.transition_to_terminal(Err(std::io::Error::new(
|
||||
std::io::ErrorKind::UnexpectedEof,
|
||||
msg,
|
||||
)));
|
||||
continue;
|
||||
}
|
||||
let data =
|
||||
&merged_read_buf[range_in_merged_read_buf.start..range_in_merged_read_buf.end];
|
||||
let data = &io_buf[range_in_io_buf.start..range_in_io_buf.end];
|
||||
|
||||
// Copy data from io buffer into the value read buffer.
|
||||
// Copy data from io buffer into the logical read buffer.
|
||||
// (And in debug mode, validate that the buffer impl adheres to the Buffer trait spec.)
|
||||
let pre = if cfg!(debug_assertions) {
|
||||
Some((value_read_buf.len(), value_read_buf.cap()))
|
||||
Some((logical_read_buf.len(), logical_read_buf.cap()))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
value_read_buf.extend_from_slice(data);
|
||||
logical_read_buf.extend_from_slice(data);
|
||||
let post = if cfg!(debug_assertions) {
|
||||
Some((value_read_buf.len(), value_read_buf.cap()))
|
||||
Some((logical_read_buf.len(), logical_read_buf.cap()))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
@@ -309,41 +306,41 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
if value_read_buf.len() == value_read_buf.cap() {
|
||||
value_read_state_borrow.transition_to_terminal(Ok(()));
|
||||
if logical_read_buf.len() == logical_read_buf.cap() {
|
||||
logical_read_state_borrow.transition_to_terminal(Ok(()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(assert_value_reads) = assert_value_reads {
|
||||
for value_read in assert_value_reads {
|
||||
assert!(value_read.state.borrow().is_terminal());
|
||||
if let Some(assert_logical_reads) = assert_logical_reads {
|
||||
for logical_read in assert_logical_reads {
|
||||
assert!(logical_read.state.borrow().is_terminal());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<B: Buffer> ValueReadState<B> {
|
||||
impl<B: Buffer> LogicalReadState<B> {
|
||||
fn is_terminal(&self) -> bool {
|
||||
match self {
|
||||
ValueReadState::NotStarted(_) | ValueReadState::Ongoing(_) => false,
|
||||
ValueReadState::Ok(_) | ValueReadState::Error(_) => true,
|
||||
ValueReadState::Undefined => unreachable!(),
|
||||
LogicalReadState::NotStarted(_) | LogicalReadState::Ongoing(_) => false,
|
||||
LogicalReadState::Ok(_) | LogicalReadState::Error(_) => true,
|
||||
LogicalReadState::Undefined => unreachable!(),
|
||||
}
|
||||
}
|
||||
fn transition_to_terminal(&mut self, err: std::io::Result<()>) {
|
||||
let cur = std::mem::replace(self, ValueReadState::Undefined);
|
||||
let cur = std::mem::replace(self, LogicalReadState::Undefined);
|
||||
let buf = match cur {
|
||||
ValueReadState::Ongoing(buf) => buf,
|
||||
LogicalReadState::Ongoing(buf) => buf,
|
||||
x => panic!("must only call in state Ongoing, got {x:?}"),
|
||||
};
|
||||
*self = match err {
|
||||
Ok(()) => ValueReadState::Ok(buf),
|
||||
Err(e) => ValueReadState::Error(Arc::new(e)),
|
||||
Ok(()) => LogicalReadState::Ok(buf),
|
||||
Err(e) => LogicalReadState::Error(Arc::new(e)),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
impl<B: Buffer> std::fmt::Debug for ValueReadState<B> {
|
||||
impl<B: Buffer> std::fmt::Debug for LogicalReadState<B> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
#[derive(Debug)]
|
||||
#[allow(unused)]
|
||||
@@ -360,15 +357,15 @@ impl<B: Buffer> std::fmt::Debug for ValueReadState<B> {
|
||||
}
|
||||
}
|
||||
match self {
|
||||
ValueReadState::NotStarted(b) => {
|
||||
LogicalReadState::NotStarted(b) => {
|
||||
write!(f, "NotStarted({:?})", BufferDebug::from(b as &dyn Buffer))
|
||||
}
|
||||
ValueReadState::Ongoing(b) => {
|
||||
LogicalReadState::Ongoing(b) => {
|
||||
write!(f, "Ongoing({:?})", BufferDebug::from(b as &dyn Buffer))
|
||||
}
|
||||
ValueReadState::Ok(b) => write!(f, "Ok({:?})", BufferDebug::from(b as &dyn Buffer)),
|
||||
ValueReadState::Error(e) => write!(f, "Error({:?})", e),
|
||||
ValueReadState::Undefined => write!(f, "Undefined"),
|
||||
LogicalReadState::Ok(b) => write!(f, "Ok({:?})", BufferDebug::from(b as &dyn Buffer)),
|
||||
LogicalReadState::Error(e) => write!(f, "Error({:?})", e),
|
||||
LogicalReadState::Undefined => write!(f, "Undefined"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -433,13 +430,13 @@ mod tests {
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
fn test_value_read(&self, pos: u32, len: usize) -> TestValueRead {
|
||||
fn test_logical_read(&self, pos: u32, len: usize) -> TestLogicalRead {
|
||||
let expected_result = if pos as usize + len > self.content.len() {
|
||||
Err(format!("InMemoryFile short read"))
|
||||
} else {
|
||||
Ok(self.content[pos as usize..pos as usize + len].to_vec())
|
||||
};
|
||||
TestValueRead::new(pos, len, expected_result)
|
||||
TestLogicalRead::new(pos, len, expected_result)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -491,13 +488,13 @@ mod tests {
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct TestValueRead {
|
||||
struct TestLogicalRead {
|
||||
pos: u32,
|
||||
len: usize,
|
||||
expected_result: Result<Vec<u8>, String>,
|
||||
}
|
||||
|
||||
impl TestValueRead {
|
||||
impl TestLogicalRead {
|
||||
fn new(pos: u32, len: usize, expected_result: Result<Vec<u8>, String>) -> Self {
|
||||
Self {
|
||||
pos,
|
||||
@@ -505,25 +502,25 @@ mod tests {
|
||||
expected_result,
|
||||
}
|
||||
}
|
||||
fn make_value_read(&self) -> ValueRead<Vec<u8>> {
|
||||
ValueRead::new(self.pos, Vec::with_capacity(self.len))
|
||||
fn make_logical_read(&self) -> LogicalRead<Vec<u8>> {
|
||||
LogicalRead::new(self.pos, Vec::with_capacity(self.len))
|
||||
}
|
||||
}
|
||||
|
||||
async fn execute_and_validate_test_value_reads<I, F>(
|
||||
async fn execute_and_validate_test_logical_reads<I, F>(
|
||||
file: &F,
|
||||
test_value_reads: I,
|
||||
test_logical_reads: I,
|
||||
ctx: &RequestContext,
|
||||
) where
|
||||
I: IntoIterator<Item = TestValueRead>,
|
||||
I: IntoIterator<Item = TestLogicalRead>,
|
||||
F: File,
|
||||
{
|
||||
let (tmp, test_value_reads) = test_value_reads.into_iter().tee();
|
||||
let value_reads = tmp.map(|tr| tr.make_value_read()).collect::<Vec<_>>();
|
||||
execute(file, value_reads.iter(), &ctx).await;
|
||||
for (value_read, test_value_read) in value_reads.into_iter().zip(test_value_reads) {
|
||||
let actual = value_read.into_result().expect("we call execute()");
|
||||
match (actual, test_value_read.expected_result) {
|
||||
let (tmp, test_logical_reads) = test_logical_reads.into_iter().tee();
|
||||
let logical_reads = tmp.map(|tr| tr.make_logical_read()).collect::<Vec<_>>();
|
||||
execute(file, logical_reads.iter(), &ctx).await;
|
||||
for (logical_read, test_logical_read) in logical_reads.into_iter().zip(test_logical_reads) {
|
||||
let actual = logical_read.into_result().expect("we call execute()");
|
||||
match (actual, test_logical_read.expected_result) {
|
||||
(Ok(actual), Ok(expected)) if actual == expected => {}
|
||||
(Err(actual), Err(expected)) => {
|
||||
assert_eq!(actual.to_string(), expected);
|
||||
@@ -541,40 +538,40 @@ mod tests {
|
||||
|
||||
let file = InMemoryFile::new_random(10 * cs);
|
||||
|
||||
let test_value_reads = vec![
|
||||
file.test_value_read(0, 1),
|
||||
// adjacent to value_read0
|
||||
file.test_value_read(1, 2),
|
||||
let test_logical_reads = vec![
|
||||
file.test_logical_read(0, 1),
|
||||
// adjacent to logical_read0
|
||||
file.test_logical_read(1, 2),
|
||||
// gap
|
||||
// spans adjacent chunks
|
||||
file.test_value_read(cs_u32 - 1, 2),
|
||||
file.test_logical_read(cs_u32 - 1, 2),
|
||||
// gap
|
||||
// tail of chunk 3, all of chunk 4, and 2 bytes of chunk 5
|
||||
file.test_value_read(3 * cs_u32 - 1, cs + 2),
|
||||
file.test_logical_read(3 * cs_u32 - 1, cs + 2),
|
||||
// gap
|
||||
file.test_value_read(5 * cs_u32, 1),
|
||||
file.test_logical_read(5 * cs_u32, 1),
|
||||
];
|
||||
let num_test_value_reads = test_value_reads.len();
|
||||
let test_value_reads_perms = test_value_reads
|
||||
let num_test_logical_reads = test_logical_reads.len();
|
||||
let test_logical_reads_perms = test_logical_reads
|
||||
.into_iter()
|
||||
.permutations(num_test_value_reads);
|
||||
.permutations(num_test_logical_reads);
|
||||
|
||||
// test all orderings of ValueReads, the order shouldn't matter for the results
|
||||
for test_value_reads in test_value_reads_perms {
|
||||
execute_and_validate_test_value_reads(&file, test_value_reads, &ctx).await;
|
||||
// test all orderings of LogicalReads, the order shouldn't matter for the results
|
||||
for test_logical_reads in test_logical_reads_perms {
|
||||
execute_and_validate_test_logical_reads(&file, test_logical_reads, &ctx).await;
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[should_panic]
|
||||
async fn test_reusing_value_reads_panics() {
|
||||
async fn test_reusing_logical_reads_panics() {
|
||||
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
|
||||
let file = InMemoryFile::new_random(DIO_CHUNK_SIZE);
|
||||
let a = file.test_value_read(23, 10);
|
||||
let value_reads = vec![a.make_value_read()];
|
||||
execute(&file, &value_reads, &ctx).await;
|
||||
let a = file.test_logical_read(23, 10);
|
||||
let logical_reads = vec![a.make_logical_read()];
|
||||
execute(&file, &logical_reads, &ctx).await;
|
||||
// reuse pancis
|
||||
execute(&file, &value_reads, &ctx).await;
|
||||
execute(&file, &logical_reads, &ctx).await;
|
||||
}
|
||||
|
||||
struct RecorderFile<'a> {
|
||||
@@ -615,17 +612,17 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_value_reads_to_same_chunk_are_merged_into_one_chunk_read() {
|
||||
async fn test_logical_reads_to_same_chunk_are_merged_into_one_chunk_read() {
|
||||
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
|
||||
|
||||
let file = InMemoryFile::new_random(2 * DIO_CHUNK_SIZE);
|
||||
|
||||
let a = file.test_value_read(DIO_CHUNK_SIZE as u32, 10);
|
||||
let b = file.test_value_read(DIO_CHUNK_SIZE as u32 + 30, 20);
|
||||
let a = file.test_logical_read(DIO_CHUNK_SIZE as u32, 10);
|
||||
let b = file.test_logical_read(DIO_CHUNK_SIZE as u32 + 30, 20);
|
||||
|
||||
let recorder = RecorderFile::new(&file);
|
||||
|
||||
execute_and_validate_test_value_reads(&recorder, vec![a, b], &ctx).await;
|
||||
execute_and_validate_test_logical_reads(&recorder, vec![a, b], &ctx).await;
|
||||
|
||||
let recorded = recorder.recorded.borrow();
|
||||
assert_eq!(recorded.len(), 1);
|
||||
@@ -643,14 +640,15 @@ mod tests {
|
||||
// read the 10th byte of each chunk 3 .. 3+2*MAX_CHUNK_BATCH_SIZE
|
||||
assert!(3 < MAX_CHUNK_BATCH_SIZE, "test assumption");
|
||||
assert!(10 < DIO_CHUNK_SIZE, "test assumption");
|
||||
let mut test_value_reads = Vec::new();
|
||||
let mut test_logical_reads = Vec::new();
|
||||
for i in 3..3 + MAX_CHUNK_BATCH_SIZE + MAX_CHUNK_BATCH_SIZE / 2 {
|
||||
test_value_reads.push(file.test_value_read(i as u32 * DIO_CHUNK_SIZE as u32 + 10, 1));
|
||||
test_logical_reads
|
||||
.push(file.test_logical_read(i as u32 * DIO_CHUNK_SIZE as u32 + 10, 1));
|
||||
}
|
||||
|
||||
let recorder = RecorderFile::new(&file);
|
||||
|
||||
execute_and_validate_test_value_reads(&recorder, test_value_reads, &ctx).await;
|
||||
execute_and_validate_test_logical_reads(&recorder, test_logical_reads, &ctx).await;
|
||||
|
||||
let recorded = recorder.recorded.borrow();
|
||||
assert_eq!(recorded.len(), 2);
|
||||
@@ -673,12 +671,12 @@ mod tests {
|
||||
assert!(MAX_CHUNK_BATCH_SIZE > 10, "test assumption");
|
||||
let file = InMemoryFile::new_random(3 * DIO_CHUNK_SIZE);
|
||||
|
||||
let a = file.test_value_read(0, 1); // chunk 0
|
||||
let b = file.test_value_read(2 * DIO_CHUNK_SIZE as u32, 1); // chunk 2
|
||||
let a = file.test_logical_read(0, 1); // chunk 0
|
||||
let b = file.test_logical_read(2 * DIO_CHUNK_SIZE as u32, 1); // chunk 2
|
||||
|
||||
let recorder = RecorderFile::new(&file);
|
||||
|
||||
execute_and_validate_test_value_reads(&recorder, vec![a, b], &ctx).await;
|
||||
execute_and_validate_test_logical_reads(&recorder, vec![a, b], &ctx).await;
|
||||
|
||||
let recorded = recorder.recorded.borrow();
|
||||
|
||||
@@ -804,39 +802,39 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_error_on_one_chunk_read_fails_only_dependent_value_reads() {
|
||||
async fn test_error_on_one_chunk_read_fails_only_dependent_logical_reads() {
|
||||
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
|
||||
|
||||
let test_value_reads = vec![
|
||||
let test_logical_reads = vec![
|
||||
// read spanning two batches
|
||||
TestValueRead::new(
|
||||
TestLogicalRead::new(
|
||||
DIO_CHUNK_SIZE as u32 / 2,
|
||||
MAX_CHUNK_BATCH_SIZE * DIO_CHUNK_SIZE,
|
||||
Err("foo".to_owned()),
|
||||
),
|
||||
// second read in failing chunk
|
||||
TestValueRead::new(
|
||||
TestLogicalRead::new(
|
||||
(MAX_CHUNK_BATCH_SIZE * DIO_CHUNK_SIZE) as u32 + DIO_CHUNK_SIZE as u32 - 10,
|
||||
5,
|
||||
Err("foo".to_owned()),
|
||||
),
|
||||
// read unaffected
|
||||
TestValueRead::new(
|
||||
TestLogicalRead::new(
|
||||
(MAX_CHUNK_BATCH_SIZE * DIO_CHUNK_SIZE) as u32 + 2 * DIO_CHUNK_SIZE as u32 + 10,
|
||||
5,
|
||||
Ok(vec![1; 5]),
|
||||
),
|
||||
];
|
||||
let (tmp, test_value_reads) = test_value_reads.into_iter().tee();
|
||||
let test_value_read_perms = tmp.permutations(test_value_reads.len());
|
||||
let (tmp, test_logical_reads) = test_logical_reads.into_iter().tee();
|
||||
let test_logical_read_perms = tmp.permutations(test_logical_reads.len());
|
||||
|
||||
for test_value_reads in test_value_read_perms {
|
||||
for test_logical_reads in test_logical_read_perms {
|
||||
let file = mock_file!(
|
||||
0 * DIO_CHUNK_SIZE as u32, MAX_CHUNK_BATCH_SIZE*DIO_CHUNK_SIZE => Ok(vec![0; MAX_CHUNK_BATCH_SIZE*DIO_CHUNK_SIZE]),
|
||||
(MAX_CHUNK_BATCH_SIZE*DIO_CHUNK_SIZE) as u32, DIO_CHUNK_SIZE => Err("foo".to_owned()),
|
||||
(MAX_CHUNK_BATCH_SIZE*DIO_CHUNK_SIZE + 2*DIO_CHUNK_SIZE) as u32, DIO_CHUNK_SIZE => Ok(vec![1; DIO_CHUNK_SIZE]),
|
||||
);
|
||||
execute_and_validate_test_value_reads(&file, test_value_reads, &ctx).await;
|
||||
execute_and_validate_test_logical_reads(&file, test_logical_reads, &ctx).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -855,17 +853,17 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_short_chunk_read_from_written_range() {
|
||||
// Test what happens if there are value reads
|
||||
// Test what happens if there are logical reads
|
||||
// that start within the last chunk, and
|
||||
// the last chunk is not the full chunk length.
|
||||
//
|
||||
// The read should succeed despite the short chunk length.
|
||||
let TestShortReadsSetup { ctx, file, written } = setup_short_chunk_read_tests();
|
||||
|
||||
let a = file.test_value_read(written - 10, 5);
|
||||
let a = file.test_logical_read(written - 10, 5);
|
||||
let recorder = RecorderFile::new(&file);
|
||||
|
||||
execute_and_validate_test_value_reads(&recorder, vec![a], &ctx).await;
|
||||
execute_and_validate_test_logical_reads(&recorder, vec![a], &ctx).await;
|
||||
|
||||
let recorded = recorder.recorded.borrow();
|
||||
assert_eq!(recorded.len(), 1);
|
||||
@@ -876,23 +874,23 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_short_chunk_read_and_value_read_from_unwritten_range() {
|
||||
// Test what happens if there are value reads
|
||||
async fn test_short_chunk_read_and_logical_read_from_unwritten_range() {
|
||||
// Test what happens if there are logical reads
|
||||
// that start within the last chunk, and
|
||||
// the last chunk is not the full chunk length, and
|
||||
// the value reads end in the unwritten range.
|
||||
// the logical reads end in the unwritten range.
|
||||
//
|
||||
// All should fail with UnexpectedEof and have the same IO pattern.
|
||||
async fn the_impl(offset_delta: i32) {
|
||||
let TestShortReadsSetup { ctx, file, written } = setup_short_chunk_read_tests();
|
||||
|
||||
let offset = (written as i32 + offset_delta) as u32;
|
||||
let a = file.test_value_read(offset, 5);
|
||||
let a = file.test_logical_read(offset, 5);
|
||||
let recorder = RecorderFile::new(&file);
|
||||
let a_vr = a.make_value_read();
|
||||
let a_vr = a.make_logical_read();
|
||||
execute(&recorder, vec![&a_vr], &ctx).await;
|
||||
|
||||
// validate the ValueRead result
|
||||
// validate the LogicalRead result
|
||||
let a_res = a_vr.into_result().unwrap();
|
||||
let a_err = a_res.unwrap_err();
|
||||
assert_eq!(a_err.kind(), std::io::ErrorKind::UnexpectedEof);
|
||||
|
||||
Reference in New Issue
Block a user