From 13ec4a073ce44e5f7a2e34deebd2fba2364c90be Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 21 Aug 2024 15:33:03 +0000 Subject: [PATCH] the great rename --- .../tenant/storage_layer/inmemory_layer.rs | 4 +- .../inmemory_layer/vectored_dio_read.rs | 342 +++++++++--------- 2 files changed, 172 insertions(+), 174 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index f49f2f5175..12608bd5e6 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -283,7 +283,7 @@ impl InMemoryLayer { struct ValueRead { entry_lsn: Lsn, - read: vectored_dio_read::ValueRead>, + read: vectored_dio_read::LogicalRead>, } let mut reads: HashMap> = HashMap::new(); @@ -303,7 +303,7 @@ impl InMemoryLayer { for (entry_lsn, index_value) in slice.iter().rev() { reads.entry(key).or_default().push(ValueRead { entry_lsn: *entry_lsn, - read: vectored_dio_read::ValueRead::new( + read: vectored_dio_read::LogicalRead::new( index_value.pos, Vec::with_capacity(index_value.len as usize), ), diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs b/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs index aaeb92e472..04bf4b96f0 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs @@ -22,13 +22,13 @@ pub trait File: Send { ) -> std::io::Result<(Slice, usize)>; } -/// A logical read that our user wants to do. -pub struct ValueRead { +/// A logical read from [`File`]. See [`Self::new`]. +pub struct LogicalRead { pos: u32, - state: RwLockRefCell>, + state: RwLockRefCell>, } -enum ValueReadState { +enum LogicalReadState { NotStarted(B), Ongoing(B), Ok(B), @@ -36,25 +36,25 @@ enum ValueReadState { Undefined, } -impl ValueRead { - /// Create a new [`ValueRead`] from [`File`] of the data in the file in range `[ pos, pos + buf.cap() )`. +impl LogicalRead { + /// Create a new [`LogicalRead`] from [`File`] of the data in the file in range `[ pos, pos + buf.cap() )`. pub fn new(pos: u32, buf: B) -> Self { Self { pos, - state: RwLockRefCell::new(ValueReadState::NotStarted(buf)), + state: RwLockRefCell::new(LogicalReadState::NotStarted(buf)), } } pub fn into_result(self) -> Option>> { match self.state.into_inner() { - ValueReadState::Ok(buf) => Some(Ok(buf)), - ValueReadState::Error(e) => Some(Err(e)), - ValueReadState::NotStarted(_) | ValueReadState::Ongoing(_) => None, - ValueReadState::Undefined => unreachable!(), + LogicalReadState::Ok(buf) => Some(Ok(buf)), + LogicalReadState::Error(e) => Some(Err(e)), + LogicalReadState::NotStarted(_) | LogicalReadState::Ongoing(_) => None, + LogicalReadState::Undefined => unreachable!(), } } } -/// The buffer into which a [`ValueRead`] result is placed. +/// The buffer into which a [`LogicalRead`] result is placed. pub trait Buffer: sealed::Sealed + std::ops::Deref { /// Immutable. fn cap(&self) -> usize; @@ -78,54 +78,53 @@ const MAX_CHUNK_BATCH_SIZE: usize = { desired / DIO_CHUNK_SIZE }; -/// Execute the given `reads` against `file`. -/// The results are placed in the buffers of the [`ValueRead`]s. -/// Retrieve the results by calling [`ValueRead::into_result`] on each [`ValueRead`]. +/// Execute the given logical `reads` against `file`. +/// The results are placed in the buffers of the [`LogicalRead`]s. +/// Retrieve the results by calling [`LogicalRead::into_result`] on each [`LogicalRead`]. /// -/// The [`ValueRead`]s must be freshly created using [`ValueRead::new`] when calling this function. -/// Otherwise, it might panic or the value resad result will be undefined. -/// TODO: prevent this through type system. +/// The [`LogicalRead`]s must be freshly created using [`LogicalRead::new`] when calling this function. +/// Otherwise, this function panics. pub async fn execute<'a, I, F, B>(file: &F, reads: I, ctx: &RequestContext) where - I: IntoIterator>, + I: IntoIterator>, F: File, B: Buffer + IoBufMut + Send, { - // Preserve a copy of the value reads for debug assertions at the end + // Preserve a copy of the logical reads for debug assertions at the end #[cfg(debug_assertions)] - let (reads, assert_value_reads) = { + let (reads, assert_logical_reads) = { let (reads, assert) = reads.into_iter().tee(); (reads, Some(Vec::from_iter(assert))) }; #[cfg(not(debug_assertions))] - let (reads, assert_value_reads) = (reads, None); + let (reads, assert_logical_reads) = (reads, None); // Plan which parts of which chunks need to be appended to which buffer - struct ChunkReadDestination<'a, B: Buffer> { - value_read: &'a ValueRead, + let mut by_chunk: BTreeMap>> = BTreeMap::new(); + struct Interest<'a, B: Buffer> { + logical_read: &'a LogicalRead, offset_in_chunk: u32, len: u32, } - let mut chunk_reads: BTreeMap>> = BTreeMap::new(); - for value_read in reads { - let ValueRead { pos, state } = value_read; + for logical_read in reads { + let LogicalRead { pos, state } = logical_read; let mut state = state.borrow_mut(); // transition from NotStarted to Ongoing - let cur = std::mem::replace(&mut *state, ValueReadState::Undefined); + let cur = std::mem::replace(&mut *state, LogicalReadState::Undefined); let remaining = match cur { - ValueReadState::NotStarted(buf) => { + LogicalReadState::NotStarted(buf) => { if buf.len() != 0 { - panic!("The `ValueRead`s that are passed in must be freshly created using `ValueRead::new`"); + panic!("The `LogicalRead`s that are passed in must be freshly created using `LogicalRead::new`"); } // buf.cap() == 0 is ok // transition into Ongoing state let remaining = buf.cap(); - *state = ValueReadState::Ongoing(buf); + *state = LogicalReadState::Ongoing(buf); remaining } - x => panic!("must only call with fresh ValueReads, got another state, leaving Undefined state behind state={x:?}"), + x => panic!("must only call with fresh LogicalReads, got another state, leaving Undefined state behind state={x:?}"), }; // plan which chunks we need to read from @@ -134,35 +133,36 @@ where let mut offset_in_chunk = usize::try_from(*pos % (DIO_CHUNK_SIZE as u32)).unwrap(); while remaining > 0 { let remaining_in_chunk = std::cmp::min(remaining, DIO_CHUNK_SIZE - offset_in_chunk); - chunk_reads - .entry(chunk_no) - .or_default() - .push(ChunkReadDestination { - value_read, - offset_in_chunk: offset_in_chunk as u32, - len: remaining_in_chunk as u32, - }); + by_chunk.entry(chunk_no).or_default().push(Interest { + logical_read, + offset_in_chunk: offset_in_chunk as u32, + len: remaining_in_chunk as u32, + }); offset_in_chunk = 0; chunk_no += 1; remaining -= remaining_in_chunk; } } - struct MergedRead<'a, B: Buffer> { + // At this point, we could iterate over by_chunk, in chunk order, + // read each chunk from disk, and fill the buffers. + // However, we can merge adjacent chunks into batches of MAX_CHUNK_BATCH_SIZE + // so we issue fewer IOs = fewer roundtrips = lower overall latency. + struct PhysicalRead<'a, B: Buffer> { start_chunk_no: u32, nchunks: u32, - dsts: Vec>, + dsts: Vec>, } - struct MergedChunkReadDestination<'a, B: Buffer> { - value_read: &'a ValueRead, - offset_in_merged_read: u32, + struct MergedInterest<'a, B: Buffer> { + logical_read: &'a LogicalRead, + offset_in_physical_read: u32, len: u32, } - let mut merged_reads: Vec> = Vec::new(); - let mut chunk_reads = chunk_reads.into_iter().peekable(); + let mut physical_reads: Vec> = Vec::new(); + let mut by_chunk = by_chunk.into_iter().peekable(); loop { let mut last_chunk_no = None; - let to_merge: Vec<(u32, Vec>)> = chunk_reads + let to_merge: Vec<(u32, Vec>)> = by_chunk .peeking_take_while(|(chunk_no, _)| { if let Some(last_chunk_no) = last_chunk_no { if *chunk_no != last_chunk_no + 1 { @@ -183,14 +183,14 @@ where .enumerate() .flat_map(|(i, (_, dsts))| { dsts.into_iter().map( - move |ChunkReadDestination { - value_read, + move |Interest { + logical_read, offset_in_chunk, len, }| { - MergedChunkReadDestination { - value_read, - offset_in_merged_read: i as u32 * DIO_CHUNK_SIZE as u32 + MergedInterest { + logical_read, + offset_in_physical_read: i as u32 * DIO_CHUNK_SIZE as u32 + offset_in_chunk, len, } @@ -198,105 +198,102 @@ where ) }) .collect(); - merged_reads.push(MergedRead { + physical_reads.push(PhysicalRead { start_chunk_no, nchunks, dsts, }); } - drop(chunk_reads); + drop(by_chunk); - // Execute reads and fill the destination + // Execute physical reads and fill the logical read buffers // TODO: prefetch let get_io_buffer = |nchunks| Vec::with_capacity(nchunks as usize * (DIO_CHUNK_SIZE)); - for MergedRead { + for PhysicalRead { start_chunk_no, nchunks, dsts, - } in merged_reads + } in physical_reads { let all_done = dsts .iter() - .all(|MergedChunkReadDestination { value_read, .. }| { - value_read.state.borrow().is_terminal() - }); + .all(|MergedInterest { logical_read, .. }| logical_read.state.borrow().is_terminal()); if all_done { continue; } let read_offset = start_chunk_no * DIO_CHUNK_SIZE as u32; let io_buf = get_io_buffer(nchunks).slice_full(); let req_len = io_buf.len(); - let (merged_read_buf_slice, nread) = match file + let (io_buf_slice, nread) = match file .read_at_to_end(start_chunk_no * DIO_CHUNK_SIZE as u32, io_buf, ctx) .await { Ok(t) => t, Err(e) => { let e = Arc::new(e); - for MergedChunkReadDestination { value_read, .. } in dsts { - *value_read.state.borrow_mut() = ValueReadState::Error(Arc::clone(&e)); - // this will make later reads for the given ValueRead short-circuit, see top of loop body + for MergedInterest { logical_read, .. } in dsts { + *logical_read.state.borrow_mut() = LogicalReadState::Error(Arc::clone(&e)); + // this will make later reads for the given LogicalRead short-circuit, see top of loop body } continue; } }; - let merged_read_buf = merged_read_buf_slice.into_inner(); + let io_buf = io_buf_slice.into_inner(); assert!( - nread <= merged_read_buf.len(), + nread <= io_buf.len(), "the last chunk in the file can be a short read, so, no ==" ); - let merged_read_buf = &merged_read_buf[..nread]; - for MergedChunkReadDestination { - value_read, - offset_in_merged_read, + let io_buf = &io_buf[..nread]; + for MergedInterest { + logical_read, + offset_in_physical_read, len, } in dsts { - let mut value_read_state_borrow = value_read.state.borrow_mut(); - let value_read_buf = match &mut *value_read_state_borrow { - ValueReadState::NotStarted(_) => { + let mut logical_read_state_borrow = logical_read.state.borrow_mut(); + let logical_read_buf = match &mut *logical_read_state_borrow { + LogicalReadState::NotStarted(_) => { unreachable!("we transition it into Ongoing at function entry") } - ValueReadState::Ongoing(buf) => buf, - ValueReadState::Ok(_) | ValueReadState::Error(_) => { - debug_assert!(value_read_state_borrow.is_terminal()); + LogicalReadState::Ongoing(buf) => buf, + LogicalReadState::Ok(_) | LogicalReadState::Error(_) => { + debug_assert!(logical_read_state_borrow.is_terminal()); continue; } - ValueReadState::Undefined => unreachable!(), + LogicalReadState::Undefined => unreachable!(), }; struct Range { start: usize, // inclusive end: usize, // exclusive } - let range_in_merged_read_buf = Range { - start: offset_in_merged_read as usize, - end: offset_in_merged_read as usize + len as usize, + let range_in_io_buf = Range { + start: offset_in_physical_read as usize, + end: offset_in_physical_read as usize + len as usize, }; - assert!(range_in_merged_read_buf.end >= range_in_merged_read_buf.start); - if range_in_merged_read_buf.end > nread { + assert!(range_in_io_buf.end >= range_in_io_buf.start); + if range_in_io_buf.end > nread { let msg = format!( - "merged chunk read returned EOF where this value read expected more data in the file: offset=0x{read_offset:x} req_len=0x{req_len:x} nread=0x{nread:x} {:?}", - &*value_read_state_borrow + "physical read returned EOF where this logical read expected more data in the file: offset=0x{read_offset:x} req_len=0x{req_len:x} nread=0x{nread:x} {:?}", + &*logical_read_state_borrow ); - value_read_state_borrow.transition_to_terminal(Err(std::io::Error::new( + logical_read_state_borrow.transition_to_terminal(Err(std::io::Error::new( std::io::ErrorKind::UnexpectedEof, msg, ))); continue; } - let data = - &merged_read_buf[range_in_merged_read_buf.start..range_in_merged_read_buf.end]; + let data = &io_buf[range_in_io_buf.start..range_in_io_buf.end]; - // Copy data from io buffer into the value read buffer. + // Copy data from io buffer into the logical read buffer. // (And in debug mode, validate that the buffer impl adheres to the Buffer trait spec.) let pre = if cfg!(debug_assertions) { - Some((value_read_buf.len(), value_read_buf.cap())) + Some((logical_read_buf.len(), logical_read_buf.cap())) } else { None }; - value_read_buf.extend_from_slice(data); + logical_read_buf.extend_from_slice(data); let post = if cfg!(debug_assertions) { - Some((value_read_buf.len(), value_read_buf.cap())) + Some((logical_read_buf.len(), logical_read_buf.cap())) } else { None }; @@ -309,41 +306,41 @@ where } } - if value_read_buf.len() == value_read_buf.cap() { - value_read_state_borrow.transition_to_terminal(Ok(())); + if logical_read_buf.len() == logical_read_buf.cap() { + logical_read_state_borrow.transition_to_terminal(Ok(())); } } } - if let Some(assert_value_reads) = assert_value_reads { - for value_read in assert_value_reads { - assert!(value_read.state.borrow().is_terminal()); + if let Some(assert_logical_reads) = assert_logical_reads { + for logical_read in assert_logical_reads { + assert!(logical_read.state.borrow().is_terminal()); } } } -impl ValueReadState { +impl LogicalReadState { fn is_terminal(&self) -> bool { match self { - ValueReadState::NotStarted(_) | ValueReadState::Ongoing(_) => false, - ValueReadState::Ok(_) | ValueReadState::Error(_) => true, - ValueReadState::Undefined => unreachable!(), + LogicalReadState::NotStarted(_) | LogicalReadState::Ongoing(_) => false, + LogicalReadState::Ok(_) | LogicalReadState::Error(_) => true, + LogicalReadState::Undefined => unreachable!(), } } fn transition_to_terminal(&mut self, err: std::io::Result<()>) { - let cur = std::mem::replace(self, ValueReadState::Undefined); + let cur = std::mem::replace(self, LogicalReadState::Undefined); let buf = match cur { - ValueReadState::Ongoing(buf) => buf, + LogicalReadState::Ongoing(buf) => buf, x => panic!("must only call in state Ongoing, got {x:?}"), }; *self = match err { - Ok(()) => ValueReadState::Ok(buf), - Err(e) => ValueReadState::Error(Arc::new(e)), + Ok(()) => LogicalReadState::Ok(buf), + Err(e) => LogicalReadState::Error(Arc::new(e)), }; } } -impl std::fmt::Debug for ValueReadState { +impl std::fmt::Debug for LogicalReadState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { #[derive(Debug)] #[allow(unused)] @@ -360,15 +357,15 @@ impl std::fmt::Debug for ValueReadState { } } match self { - ValueReadState::NotStarted(b) => { + LogicalReadState::NotStarted(b) => { write!(f, "NotStarted({:?})", BufferDebug::from(b as &dyn Buffer)) } - ValueReadState::Ongoing(b) => { + LogicalReadState::Ongoing(b) => { write!(f, "Ongoing({:?})", BufferDebug::from(b as &dyn Buffer)) } - ValueReadState::Ok(b) => write!(f, "Ok({:?})", BufferDebug::from(b as &dyn Buffer)), - ValueReadState::Error(e) => write!(f, "Error({:?})", e), - ValueReadState::Undefined => write!(f, "Undefined"), + LogicalReadState::Ok(b) => write!(f, "Ok({:?})", BufferDebug::from(b as &dyn Buffer)), + LogicalReadState::Error(e) => write!(f, "Error({:?})", e), + LogicalReadState::Undefined => write!(f, "Undefined"), } } } @@ -433,13 +430,13 @@ mod tests { .collect(), } } - fn test_value_read(&self, pos: u32, len: usize) -> TestValueRead { + fn test_logical_read(&self, pos: u32, len: usize) -> TestLogicalRead { let expected_result = if pos as usize + len > self.content.len() { Err(format!("InMemoryFile short read")) } else { Ok(self.content[pos as usize..pos as usize + len].to_vec()) }; - TestValueRead::new(pos, len, expected_result) + TestLogicalRead::new(pos, len, expected_result) } } @@ -491,13 +488,13 @@ mod tests { } #[derive(Clone)] - struct TestValueRead { + struct TestLogicalRead { pos: u32, len: usize, expected_result: Result, String>, } - impl TestValueRead { + impl TestLogicalRead { fn new(pos: u32, len: usize, expected_result: Result, String>) -> Self { Self { pos, @@ -505,25 +502,25 @@ mod tests { expected_result, } } - fn make_value_read(&self) -> ValueRead> { - ValueRead::new(self.pos, Vec::with_capacity(self.len)) + fn make_logical_read(&self) -> LogicalRead> { + LogicalRead::new(self.pos, Vec::with_capacity(self.len)) } } - async fn execute_and_validate_test_value_reads( + async fn execute_and_validate_test_logical_reads( file: &F, - test_value_reads: I, + test_logical_reads: I, ctx: &RequestContext, ) where - I: IntoIterator, + I: IntoIterator, F: File, { - let (tmp, test_value_reads) = test_value_reads.into_iter().tee(); - let value_reads = tmp.map(|tr| tr.make_value_read()).collect::>(); - execute(file, value_reads.iter(), &ctx).await; - for (value_read, test_value_read) in value_reads.into_iter().zip(test_value_reads) { - let actual = value_read.into_result().expect("we call execute()"); - match (actual, test_value_read.expected_result) { + let (tmp, test_logical_reads) = test_logical_reads.into_iter().tee(); + let logical_reads = tmp.map(|tr| tr.make_logical_read()).collect::>(); + execute(file, logical_reads.iter(), &ctx).await; + for (logical_read, test_logical_read) in logical_reads.into_iter().zip(test_logical_reads) { + let actual = logical_read.into_result().expect("we call execute()"); + match (actual, test_logical_read.expected_result) { (Ok(actual), Ok(expected)) if actual == expected => {} (Err(actual), Err(expected)) => { assert_eq!(actual.to_string(), expected); @@ -541,40 +538,40 @@ mod tests { let file = InMemoryFile::new_random(10 * cs); - let test_value_reads = vec![ - file.test_value_read(0, 1), - // adjacent to value_read0 - file.test_value_read(1, 2), + let test_logical_reads = vec![ + file.test_logical_read(0, 1), + // adjacent to logical_read0 + file.test_logical_read(1, 2), // gap // spans adjacent chunks - file.test_value_read(cs_u32 - 1, 2), + file.test_logical_read(cs_u32 - 1, 2), // gap // tail of chunk 3, all of chunk 4, and 2 bytes of chunk 5 - file.test_value_read(3 * cs_u32 - 1, cs + 2), + file.test_logical_read(3 * cs_u32 - 1, cs + 2), // gap - file.test_value_read(5 * cs_u32, 1), + file.test_logical_read(5 * cs_u32, 1), ]; - let num_test_value_reads = test_value_reads.len(); - let test_value_reads_perms = test_value_reads + let num_test_logical_reads = test_logical_reads.len(); + let test_logical_reads_perms = test_logical_reads .into_iter() - .permutations(num_test_value_reads); + .permutations(num_test_logical_reads); - // test all orderings of ValueReads, the order shouldn't matter for the results - for test_value_reads in test_value_reads_perms { - execute_and_validate_test_value_reads(&file, test_value_reads, &ctx).await; + // test all orderings of LogicalReads, the order shouldn't matter for the results + for test_logical_reads in test_logical_reads_perms { + execute_and_validate_test_logical_reads(&file, test_logical_reads, &ctx).await; } } #[tokio::test] #[should_panic] - async fn test_reusing_value_reads_panics() { + async fn test_reusing_logical_reads_panics() { let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); let file = InMemoryFile::new_random(DIO_CHUNK_SIZE); - let a = file.test_value_read(23, 10); - let value_reads = vec![a.make_value_read()]; - execute(&file, &value_reads, &ctx).await; + let a = file.test_logical_read(23, 10); + let logical_reads = vec![a.make_logical_read()]; + execute(&file, &logical_reads, &ctx).await; // reuse pancis - execute(&file, &value_reads, &ctx).await; + execute(&file, &logical_reads, &ctx).await; } struct RecorderFile<'a> { @@ -615,17 +612,17 @@ mod tests { } #[tokio::test] - async fn test_value_reads_to_same_chunk_are_merged_into_one_chunk_read() { + async fn test_logical_reads_to_same_chunk_are_merged_into_one_chunk_read() { let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); let file = InMemoryFile::new_random(2 * DIO_CHUNK_SIZE); - let a = file.test_value_read(DIO_CHUNK_SIZE as u32, 10); - let b = file.test_value_read(DIO_CHUNK_SIZE as u32 + 30, 20); + let a = file.test_logical_read(DIO_CHUNK_SIZE as u32, 10); + let b = file.test_logical_read(DIO_CHUNK_SIZE as u32 + 30, 20); let recorder = RecorderFile::new(&file); - execute_and_validate_test_value_reads(&recorder, vec![a, b], &ctx).await; + execute_and_validate_test_logical_reads(&recorder, vec![a, b], &ctx).await; let recorded = recorder.recorded.borrow(); assert_eq!(recorded.len(), 1); @@ -643,14 +640,15 @@ mod tests { // read the 10th byte of each chunk 3 .. 3+2*MAX_CHUNK_BATCH_SIZE assert!(3 < MAX_CHUNK_BATCH_SIZE, "test assumption"); assert!(10 < DIO_CHUNK_SIZE, "test assumption"); - let mut test_value_reads = Vec::new(); + let mut test_logical_reads = Vec::new(); for i in 3..3 + MAX_CHUNK_BATCH_SIZE + MAX_CHUNK_BATCH_SIZE / 2 { - test_value_reads.push(file.test_value_read(i as u32 * DIO_CHUNK_SIZE as u32 + 10, 1)); + test_logical_reads + .push(file.test_logical_read(i as u32 * DIO_CHUNK_SIZE as u32 + 10, 1)); } let recorder = RecorderFile::new(&file); - execute_and_validate_test_value_reads(&recorder, test_value_reads, &ctx).await; + execute_and_validate_test_logical_reads(&recorder, test_logical_reads, &ctx).await; let recorded = recorder.recorded.borrow(); assert_eq!(recorded.len(), 2); @@ -673,12 +671,12 @@ mod tests { assert!(MAX_CHUNK_BATCH_SIZE > 10, "test assumption"); let file = InMemoryFile::new_random(3 * DIO_CHUNK_SIZE); - let a = file.test_value_read(0, 1); // chunk 0 - let b = file.test_value_read(2 * DIO_CHUNK_SIZE as u32, 1); // chunk 2 + let a = file.test_logical_read(0, 1); // chunk 0 + let b = file.test_logical_read(2 * DIO_CHUNK_SIZE as u32, 1); // chunk 2 let recorder = RecorderFile::new(&file); - execute_and_validate_test_value_reads(&recorder, vec![a, b], &ctx).await; + execute_and_validate_test_logical_reads(&recorder, vec![a, b], &ctx).await; let recorded = recorder.recorded.borrow(); @@ -804,39 +802,39 @@ mod tests { } #[tokio::test] - async fn test_error_on_one_chunk_read_fails_only_dependent_value_reads() { + async fn test_error_on_one_chunk_read_fails_only_dependent_logical_reads() { let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); - let test_value_reads = vec![ + let test_logical_reads = vec![ // read spanning two batches - TestValueRead::new( + TestLogicalRead::new( DIO_CHUNK_SIZE as u32 / 2, MAX_CHUNK_BATCH_SIZE * DIO_CHUNK_SIZE, Err("foo".to_owned()), ), // second read in failing chunk - TestValueRead::new( + TestLogicalRead::new( (MAX_CHUNK_BATCH_SIZE * DIO_CHUNK_SIZE) as u32 + DIO_CHUNK_SIZE as u32 - 10, 5, Err("foo".to_owned()), ), // read unaffected - TestValueRead::new( + TestLogicalRead::new( (MAX_CHUNK_BATCH_SIZE * DIO_CHUNK_SIZE) as u32 + 2 * DIO_CHUNK_SIZE as u32 + 10, 5, Ok(vec![1; 5]), ), ]; - let (tmp, test_value_reads) = test_value_reads.into_iter().tee(); - let test_value_read_perms = tmp.permutations(test_value_reads.len()); + let (tmp, test_logical_reads) = test_logical_reads.into_iter().tee(); + let test_logical_read_perms = tmp.permutations(test_logical_reads.len()); - for test_value_reads in test_value_read_perms { + for test_logical_reads in test_logical_read_perms { let file = mock_file!( 0 * DIO_CHUNK_SIZE as u32, MAX_CHUNK_BATCH_SIZE*DIO_CHUNK_SIZE => Ok(vec![0; MAX_CHUNK_BATCH_SIZE*DIO_CHUNK_SIZE]), (MAX_CHUNK_BATCH_SIZE*DIO_CHUNK_SIZE) as u32, DIO_CHUNK_SIZE => Err("foo".to_owned()), (MAX_CHUNK_BATCH_SIZE*DIO_CHUNK_SIZE + 2*DIO_CHUNK_SIZE) as u32, DIO_CHUNK_SIZE => Ok(vec![1; DIO_CHUNK_SIZE]), ); - execute_and_validate_test_value_reads(&file, test_value_reads, &ctx).await; + execute_and_validate_test_logical_reads(&file, test_logical_reads, &ctx).await; } } @@ -855,17 +853,17 @@ mod tests { #[tokio::test] async fn test_short_chunk_read_from_written_range() { - // Test what happens if there are value reads + // Test what happens if there are logical reads // that start within the last chunk, and // the last chunk is not the full chunk length. // // The read should succeed despite the short chunk length. let TestShortReadsSetup { ctx, file, written } = setup_short_chunk_read_tests(); - let a = file.test_value_read(written - 10, 5); + let a = file.test_logical_read(written - 10, 5); let recorder = RecorderFile::new(&file); - execute_and_validate_test_value_reads(&recorder, vec![a], &ctx).await; + execute_and_validate_test_logical_reads(&recorder, vec![a], &ctx).await; let recorded = recorder.recorded.borrow(); assert_eq!(recorded.len(), 1); @@ -876,23 +874,23 @@ mod tests { } #[tokio::test] - async fn test_short_chunk_read_and_value_read_from_unwritten_range() { - // Test what happens if there are value reads + async fn test_short_chunk_read_and_logical_read_from_unwritten_range() { + // Test what happens if there are logical reads // that start within the last chunk, and // the last chunk is not the full chunk length, and - // the value reads end in the unwritten range. + // the logical reads end in the unwritten range. // // All should fail with UnexpectedEof and have the same IO pattern. async fn the_impl(offset_delta: i32) { let TestShortReadsSetup { ctx, file, written } = setup_short_chunk_read_tests(); let offset = (written as i32 + offset_delta) as u32; - let a = file.test_value_read(offset, 5); + let a = file.test_logical_read(offset, 5); let recorder = RecorderFile::new(&file); - let a_vr = a.make_value_read(); + let a_vr = a.make_logical_read(); execute(&recorder, vec![&a_vr], &ctx).await; - // validate the ValueRead result + // validate the LogicalRead result let a_res = a_vr.into_result().unwrap(); let a_err = a_res.unwrap_err(); assert_eq!(a_err.kind(), std::io::ErrorKind::UnexpectedEof);