diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs b/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs index 3c79b9c3cb..785af671ec 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs @@ -56,6 +56,17 @@ pub trait Buffer: sealed::Sealed + std::ops::Deref { /// The minimum alignment and size requirement for disk offsets and memory buffer size for direct IO. const DIO_CHUNK_SIZE: usize = 512; +/// If multiple chunks need to be read, merge adjacent chunk reads into batches of max size `MAX_CHUNK_BATCH_SIZE`. +/// (The unit is the number of chunks.) +const MAX_CHUNK_BATCH_SIZE: usize = { + let desired = 128 * 1024; // 128k + if desired % DIO_CHUNK_SIZE != 0 { + panic!("MAX_CHUNK_BATCH_SIZE must be a multiple of DIO_CHUNK_SIZE") + // compile-time error + } + desired / DIO_CHUNK_SIZE +}; + /// Execute the given `reads` against `file`. /// The results are placed in the buffers of the [`ValueRead`]s. /// Retrieve the results by calling [`ValueRead::into_result`] on each [`ValueRead`]. @@ -65,8 +76,8 @@ const DIO_CHUNK_SIZE: usize = 512; /// TODO: prevent this through type system. pub async fn execute<'a, I, F, B>(file: &F, reads: I, ctx: &RequestContext) where - I: IntoIterator> + Send, - F: File + Send, + I: IntoIterator>, + F: File, B: Buffer + IoBufMut + Send, { // Plan which parts of which chunks need to be appended to which buffer @@ -111,15 +122,6 @@ where } } - // Merge adjacent chunk reads (merging pass on the BTreeMap iterator) - const MAX_CHUNK_BATCH_SIZE: usize = { - let desired = 128 * 1024; // 128k - if desired % DIO_CHUNK_SIZE != 0 { - panic!("MAX_CHUNK_BATCH_SIZE must be a multiple of DIO_CHUNK_SIZE") - // compile-time error - } - desired / DIO_CHUNK_SIZE - }; struct MergedRead<'a, B: Buffer> { start_chunk_no: u32, nchunks: u32, @@ -319,7 +321,7 @@ mod tests { } } - #[derive(Debug)] + #[derive(Debug, Clone)] struct TestValueRead { pos: u32, expected_result: Vec, @@ -382,6 +384,15 @@ mod tests { expected: RefCell>, } + impl Drop for MockFile { + fn drop(&mut self) { + assert!( + self.expected.borrow().is_empty(), + "expected reads not satisfied" + ); + } + } + macro_rules! mock_file { ($($pos:expr , $len:expr => $respond:expr),* $(,)?) => {{ MockFile { @@ -470,4 +481,112 @@ mod tests { .unwrap(); assert_eq!(err.to_string(), "foo"); } + + struct RecorderFile<'a> { + recorded: RefCell>, + file: &'a InMemoryFile, + } + + struct RecordedRead { + pos: u32, + req_len: usize, + res: Vec, + } + + impl<'a> RecorderFile<'a> { + fn new(file: &'a InMemoryFile) -> RecorderFile<'a> { + Self { + recorded: Default::default(), + file, + } + } + } + + impl<'x> File for RecorderFile<'x> { + async fn read_at_to_end<'a, 'b, B: IoBufMut + Send>( + &'b self, + start: u32, + dst: Slice, + ctx: &'a RequestContext, + ) -> std::io::Result<(Slice, usize)> { + let (dst, nread) = self.file.read_at_to_end(start, dst, ctx).await?; + self.recorded.borrow_mut().push(RecordedRead { + pos: start, + req_len: dst.bytes_total(), + res: Vec::from(&dst[..nread]), + }); + Ok((dst, nread)) + } + } + + async fn execute_and_validate_test_value_reads( + file: &F, + test_value_reads: I, + ctx: &RequestContext, + ) where + I: IntoIterator, + F: File, + { + let (tmp, test_value_reads) = test_value_reads.into_iter().tee(); + let value_reads = tmp.map(|tr| tr.make_value_read()).collect::>(); + execute(file, value_reads.iter(), &ctx).await; + for (value_read, test_value_read) in value_reads.into_iter().zip(test_value_reads) { + let res = value_read + .into_result() + .expect("InMemoryFile is infallible"); + assert_eq!(res, test_value_read.expected_result); + } + } + + #[tokio::test] + async fn test_value_reads_to_same_chunk_are_merged_into_one_chunk_read() { + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + + let file = InMemoryFile::new_random(2 * DIO_CHUNK_SIZE); + + let a = file.test_value_read(DIO_CHUNK_SIZE as u32, 10); + let b = file.test_value_read(DIO_CHUNK_SIZE as u32 + 30, 20); + + let recorder = RecorderFile::new(&file); + + execute_and_validate_test_value_reads(&recorder, vec![a, b], &ctx).await; + + let recorded = recorder.recorded.borrow(); + assert_eq!(recorded.len(), 1); + let RecordedRead { pos, req_len, .. } = &recorded[0]; + assert_eq!(*pos, DIO_CHUNK_SIZE as u32); + assert_eq!(*req_len, DIO_CHUNK_SIZE); + } + + #[tokio::test] + async fn test_max_chunk_batch_size_is_respected() { + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + + let file = InMemoryFile::new_random(4 * MAX_CHUNK_BATCH_SIZE * DIO_CHUNK_SIZE); + + // read the 10th byte of each chunk 3 .. 3+2*MAX_CHUNK_BATCH_SIZE + assert!(3 < MAX_CHUNK_BATCH_SIZE, "test assumption"); + assert!(10 < DIO_CHUNK_SIZE, "test assumption"); + let mut test_value_reads = Vec::new(); + for i in 3..3 + MAX_CHUNK_BATCH_SIZE + MAX_CHUNK_BATCH_SIZE / 2 { + test_value_reads.push(file.test_value_read(i as u32 * DIO_CHUNK_SIZE as u32 + 10, 1)); + } + + let recorder = RecorderFile::new(&file); + + execute_and_validate_test_value_reads(&recorder, test_value_reads, &ctx).await; + + let recorded = recorder.recorded.borrow(); + assert_eq!(recorded.len(), 2); + { + let RecordedRead { pos, req_len, .. } = &recorded[0]; + assert_eq!(*pos as usize, 3 * DIO_CHUNK_SIZE); + assert_eq!(*req_len, MAX_CHUNK_BATCH_SIZE * DIO_CHUNK_SIZE); + } + { + let RecordedRead { pos, req_len, .. } = &recorded[1]; + assert_eq!(*pos as usize, (3 + MAX_CHUNK_BATCH_SIZE) * DIO_CHUNK_SIZE); + assert_eq!(*req_len, MAX_CHUNK_BATCH_SIZE / 2 * DIO_CHUNK_SIZE); + } + } }