From a38714a93d97c2e4cc5d39be7f652776cbc7cb86 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 21 Aug 2024 14:28:44 +0000 Subject: [PATCH] tests for short chunk reads + refactor for ValueReadState --- .../tenant/storage_layer/inmemory_layer.rs | 2 +- .../inmemory_layer/vectored_dio_read.rs | 343 +++++++++++++++--- 2 files changed, 294 insertions(+), 51 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 39fc05b39f..f49f2f5175 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -329,7 +329,7 @@ impl InMemoryLayer { // Process results into the reconstruct state 'next_key: for (key, value_reads) in reads { for ValueRead { entry_lsn, read } in value_reads { - match read.into_result() { + match read.into_result().expect("we run execute() above") { Err(e) => { reconstruct_state.on_key_error(key, PageReconstructError::from(anyhow!(e))); continue 'next_key; diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs b/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs index f8daac4a17..6046d60dcc 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer/vectored_dio_read.rs @@ -1,6 +1,6 @@ use std::{ collections::BTreeMap, - sync::{Arc, Mutex}, + sync::{Arc, RwLock}, }; use itertools::Itertools; @@ -23,11 +23,17 @@ pub trait File: Send { } /// A logical read that our user wants to do. -#[derive(Debug)] pub struct ValueRead { pos: u32, - // TODO: use tri-state enum to distinguish between not started, started, and finished-with-ok-or-err - state: MutexRefCell>>, + state: RwLockRefCell>, +} + +enum ValueReadState { + NotStarted(B), + Ongoing(B), + Ok(B), + Error(Arc), + Undefined, } impl ValueRead { @@ -35,21 +41,25 @@ impl ValueRead { pub fn new(pos: u32, buf: B) -> Self { Self { pos, - state: MutexRefCell::new(Ok(buf)), + state: RwLockRefCell::new(ValueReadState::NotStarted(buf)), } } - pub fn into_result(self) -> Result> { - self.state.into_inner() + pub fn into_result(self) -> Option>> { + match self.state.into_inner() { + ValueReadState::Ok(buf) => Some(Ok(buf)), + ValueReadState::Error(e) => Some(Err(e)), + ValueReadState::NotStarted(_) | ValueReadState::Ongoing(_) => None, + ValueReadState::Undefined => unreachable!(), + } } } /// The buffer into which a [`ValueRead`] result is placed. pub trait Buffer: sealed::Sealed + std::ops::Deref { + /// Immutable. fn cap(&self) -> usize; + /// Changes only through [`Self::extend_from_slice`]. fn len(&self) -> usize; - fn remaining(&self) -> usize { - self.cap().checked_sub(self.len()).unwrap() - } /// Panics if the total length would exceed the initialized capacity. fn extend_from_slice(&mut self, src: &[u8]); } @@ -81,29 +91,44 @@ where F: File, B: Buffer + IoBufMut + Send, { + // Preserve a copy of the value reads for debug assertions at the end + #[cfg(debug_assertions)] + let (reads, assert_value_reads) = { + let (reads, assert) = reads.into_iter().tee(); + (reads, Some(Vec::from_iter(assert))) + }; + #[cfg(not(debug_assertions))] + let (reads, assert_value_reads) = (reads, None); + // Plan which parts of which chunks need to be appended to which buffer struct ChunkReadDestination<'a, B: Buffer> { value_read: &'a ValueRead, offset_in_chunk: u32, len: u32, } - // use of BTreeMap's sorted iterator is critical to ensure buffer is filled in order let mut chunk_reads: BTreeMap>> = BTreeMap::new(); for value_read in reads { let ValueRead { pos, state } = value_read; - let state = state.borrow(); - match state.as_ref() { - Err(_) => panic!("The `ValueRead`s that are passed in must be freshly created using `ValueRead::new`"), - Ok(buf) => { + let mut state = state.borrow_mut(); + + // transition from NotStarted to Ongoing + let cur = std::mem::replace(&mut *state, ValueReadState::Undefined); + let remaining = match cur { + ValueReadState::NotStarted(buf) => { if buf.len() != 0 { panic!("The `ValueRead`s that are passed in must be freshly created using `ValueRead::new`"); } + // buf.cap() == 0 is ok + + // transition into Ongoing state + let remaining = buf.cap(); + *state = ValueReadState::Ongoing(buf); + remaining } - } - let remaining = state - .as_ref() - .expect("we haven't started reading, no chance it's in Err() state") - .remaining(); + x => panic!("must only call with fresh ValueReads, got another state, leaving Undefined state behind state={x:?}"), + }; + + // plan which chunks we need to read from let mut remaining = usize::try_from(remaining).unwrap(); let mut chunk_no = *pos / (DIO_CHUNK_SIZE as u32); let mut offset_in_chunk = usize::try_from(*pos % (DIO_CHUNK_SIZE as u32)).unwrap(); @@ -183,7 +208,7 @@ where // Execute reads and fill the destination // TODO: prefetch - let get_chunk_buf = |nchunks| Vec::with_capacity(nchunks as usize * (DIO_CHUNK_SIZE)); + let get_io_buffer = |nchunks| Vec::with_capacity(nchunks as usize * (DIO_CHUNK_SIZE)); for MergedRead { start_chunk_no, nchunks, @@ -193,31 +218,33 @@ where let all_done = dsts .iter() .all(|MergedChunkReadDestination { value_read, .. }| { - value_read.state.borrow().is_err() + value_read.state.borrow().is_terminal() }); if all_done { continue; } + let read_offset = start_chunk_no * DIO_CHUNK_SIZE as u32; + let io_buf = get_io_buffer(nchunks).slice_full(); + let req_len = io_buf.len(); let (merged_read_buf_slice, nread) = match file - .read_at_to_end( - start_chunk_no * DIO_CHUNK_SIZE as u32, - get_chunk_buf(nchunks).slice_full(), - ctx, - ) + .read_at_to_end(start_chunk_no * DIO_CHUNK_SIZE as u32, io_buf, ctx) .await { Ok(t) => t, Err(e) => { let e = Arc::new(e); for MergedChunkReadDestination { value_read, .. } in dsts { - *value_read.state.borrow_mut() = Err(Arc::clone(&e)); + *value_read.state.borrow_mut() = ValueReadState::Error(Arc::clone(&e)); // this will make later reads for the given ValueRead short-circuit, see top of loop body } continue; } }; let merged_read_buf = merged_read_buf_slice.into_inner(); - assert_eq!(nread, merged_read_buf.len()); + assert!( + nread <= merged_read_buf.len(), + "the last chunk in the file can be a short read, so, no ==" + ); let merged_read_buf = &merged_read_buf[..nread]; for MergedChunkReadDestination { value_read, @@ -225,27 +252,138 @@ where len, } in dsts { - if let Ok(buf) = &mut *value_read.state.borrow_mut() { - let data = &merged_read_buf - [offset_in_merged_read as usize..(offset_in_merged_read + len) as usize]; - assert!(buf.remaining() >= data.len()); - buf.extend_from_slice(data); + let mut value_read_state_borrow = value_read.state.borrow_mut(); + let value_read_buf = match &mut *value_read_state_borrow { + ValueReadState::NotStarted(_) => { + unreachable!("we transition it into Ongoing at function entry") + } + ValueReadState::Ongoing(buf) => buf, + ValueReadState::Ok(_) | ValueReadState::Error(_) => { + debug_assert!(value_read_state_borrow.is_terminal()); + continue; + } + ValueReadState::Undefined => unreachable!(), + }; + struct Range { + start: usize, // inclusive + end: usize, // exclusive } + let range_in_merged_read_buf = Range { + start: offset_in_merged_read as usize, + end: offset_in_merged_read as usize + len as usize, + }; + assert!(range_in_merged_read_buf.end >= range_in_merged_read_buf.start); + if range_in_merged_read_buf.end > nread { + let msg = format!( + "merged chunk read returned EOF where this value read expected more data in the file: offset=0x{read_offset:x} req_len=0x{req_len:x} nread=0x{nread:x} {:?}", + &*value_read_state_borrow + ); + value_read_state_borrow.transition_to_terminal(Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + msg, + ))); + continue; + } + let data = + &merged_read_buf[range_in_merged_read_buf.start..range_in_merged_read_buf.end]; + + // Copy data from io buffer into the value read buffer. + // (And in debug mode, validate that the buffer impl adheres to the Buffer trait spec.) + let pre = if cfg!(debug_assertions) { + Some((value_read_buf.len(), value_read_buf.cap())) + } else { + None + }; + value_read_buf.extend_from_slice(data); + let post = if cfg!(debug_assertions) { + Some((value_read_buf.len(), value_read_buf.cap())) + } else { + None + }; + match (pre, post) { + (None, None) => {} + (Some(_), None) | (None, Some(_)) => unreachable!(), + (Some((pre_len, pre_cap)), Some((post_len, post_cap))) => { + assert_eq!(pre_len + len as usize, post_len); + assert_eq!(pre_cap, post_cap); + } + } + + if value_read_buf.len() == value_read_buf.cap() { + value_read_state_borrow.transition_to_terminal(Ok(())); + } + } + } + + if let Some(assert_value_reads) = assert_value_reads { + for value_read in assert_value_reads { + assert!(value_read.state.borrow().is_terminal()); + } + } +} + +impl ValueReadState { + fn is_terminal(&self) -> bool { + match self { + ValueReadState::NotStarted(_) | ValueReadState::Ongoing(_) => false, + ValueReadState::Ok(_) | ValueReadState::Error(_) => true, + ValueReadState::Undefined => unreachable!(), + } + } + fn transition_to_terminal(&mut self, err: std::io::Result<()>) { + let cur = std::mem::replace(self, ValueReadState::Undefined); + let buf = match cur { + ValueReadState::Ongoing(buf) => buf, + x => panic!("must only call in state Ongoing, got {x:?}"), + }; + *self = match err { + Ok(()) => ValueReadState::Ok(buf), + Err(e) => ValueReadState::Error(Arc::new(e)), + }; + } +} + +impl std::fmt::Debug for ValueReadState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + #[derive(Debug)] + #[allow(unused)] + struct BufferDebug { + len: usize, + cap: usize, + } + impl<'a> From<&'a dyn Buffer> for BufferDebug { + fn from(buf: &'a dyn Buffer) -> Self { + Self { + len: buf.len(), + cap: buf.cap(), + } + } + } + match self { + ValueReadState::NotStarted(b) => { + write!(f, "NotStarted({:?})", BufferDebug::from(b as &dyn Buffer)) + } + ValueReadState::Ongoing(b) => { + write!(f, "Ongoing({:?})", BufferDebug::from(b as &dyn Buffer)) + } + ValueReadState::Ok(b) => write!(f, "Ok({:?})", BufferDebug::from(b as &dyn Buffer)), + ValueReadState::Error(e) => write!(f, "Error({:?})", e), + ValueReadState::Undefined => write!(f, "Undefined"), } } } #[derive(Debug)] -struct MutexRefCell(Mutex); -impl MutexRefCell { +struct RwLockRefCell(RwLock); +impl RwLockRefCell { fn new(value: T) -> Self { - Self(Mutex::new(value)) + Self(RwLock::new(value)) } fn borrow(&self) -> impl std::ops::Deref + '_ { - self.0.lock().unwrap() + self.0.try_read().unwrap() } fn borrow_mut(&self) -> impl std::ops::DerefMut + '_ { - self.0.lock().unwrap() + self.0.try_write().unwrap() } fn into_inner(self) -> T { self.0.into_inner().unwrap() @@ -296,12 +434,38 @@ mod tests { } } fn test_value_read(&self, pos: u32, len: usize) -> TestValueRead { - // InMemoryFile reads are infallible - let expected_result = Ok(self.content[pos as usize..pos as usize + len].to_vec()); + let expected_result = if pos as usize + len > self.content.len() { + Err(format!("InMemoryFile short read")) + } else { + Ok(self.content[pos as usize..pos as usize + len].to_vec()) + }; TestValueRead::new(pos, len, expected_result) } } + #[test] + fn test_in_memory_file() { + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + let file = InMemoryFile::new_random(10); + let test_read = |pos, len| { + let buf = vec![0; len]; + let fut = file.read_at_to_end(pos, buf.slice_full(), &ctx); + use futures::FutureExt; + let (slice, nread) = fut + .now_or_never() + .expect("impl never awaits") + .expect("impl never errors"); + let mut buf = slice.into_inner(); + buf.truncate(nread); + buf + }; + assert_eq!(test_read(0, 1), &file.content[0..1]); + assert_eq!(test_read(1, 2), &file.content[1..3]); + assert_eq!(test_read(9, 2), &file.content[9..]); + assert!(test_read(10, 2).is_empty()); + assert!(test_read(11, 2).is_empty()); + } + impl File for InMemoryFile { async fn read_at_to_end<'a, 'b, B: IoBufMut + Send>( &'b self, @@ -309,14 +473,20 @@ mod tests { mut dst: Slice, _ctx: &'a RequestContext, ) -> std::io::Result<(Slice, usize)> { - let len = std::cmp::min( - dst.bytes_total(), - self.content.len().saturating_sub(start as usize), - ); let dst_slice: &mut [u8] = dst.as_mut_rust_slice_full_zeroed(); - dst_slice[..len].copy_from_slice(&self.content[start as usize..start as usize + len]); - rand::Rng::fill(&mut rand::thread_rng(), &mut dst_slice[len..]); // to discover bugs - Ok((dst, len)) + let nread = { + let req_len = dst_slice.len(); + let len = std::cmp::min(req_len, self.content.len().saturating_sub(start as usize)); + if start as usize >= self.content.len() { + 0 + } else { + dst_slice[..len] + .copy_from_slice(&self.content[start as usize..start as usize + len]); + len + } + }; + rand::Rng::fill(&mut rand::thread_rng(), &mut dst_slice[nread..]); // to discover bugs + Ok((dst, nread)) } } @@ -352,7 +522,7 @@ mod tests { let value_reads = tmp.map(|tr| tr.make_value_read()).collect::>(); execute(file, value_reads.iter(), &ctx).await; for (value_read, test_value_read) in value_reads.into_iter().zip(test_value_reads) { - let actual = value_read.into_result(); + let actual = value_read.into_result().expect("we call execute()"); match (actual, test_value_read.expected_result) { (Ok(actual), Ok(expected)) if actual == expected => {} (Err(actual), Err(expected)) => { @@ -641,5 +811,78 @@ mod tests { } } - // TODO: short reads at end + struct TestShortReadsSetup { + ctx: RequestContext, + file: InMemoryFile, + written: u32, + } + fn setup_short_chunk_read_tests() -> TestShortReadsSetup { + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + assert!(DIO_CHUNK_SIZE > 20, "test assumption"); + let written = (2 * DIO_CHUNK_SIZE - 10) as u32; + let file = InMemoryFile::new_random(written as usize); + TestShortReadsSetup { ctx, file, written } + } + + #[tokio::test] + async fn test_short_chunk_read_from_written_range() { + // Test what happens if there are value reads + // that start within the last chunk, and + // the last chunk is not the full chunk length. + // + // The read should succeed despite the short chunk length. + let TestShortReadsSetup { ctx, file, written } = setup_short_chunk_read_tests(); + + let a = file.test_value_read(written - 10, 5); + let recorder = RecorderFile::new(&file); + + execute_and_validate_test_value_reads(&recorder, vec![a], &ctx).await; + + let recorded = recorder.recorded.borrow(); + assert_eq!(recorded.len(), 1); + let RecordedRead { pos, req_len, res } = &recorded[0]; + assert_eq!(*pos, DIO_CHUNK_SIZE as u32); + assert_eq!(*req_len, DIO_CHUNK_SIZE); + assert_eq!(res, &file.content[DIO_CHUNK_SIZE..(written as usize)]); + } + + #[tokio::test] + async fn test_short_chunk_read_and_value_read_from_unwritten_range() { + // Test what happens if there are value reads + // that start within the last chunk, and + // the last chunk is not the full chunk length, and + // the value reads end in the unwritten range. + // + // All should fail with UnexpectedEof and have the same IO pattern. + async fn the_impl(offset_delta: i32) { + let TestShortReadsSetup { ctx, file, written } = setup_short_chunk_read_tests(); + + let offset = (written as i32 + offset_delta) as u32; + let a = file.test_value_read(offset, 5); + let recorder = RecorderFile::new(&file); + let a_vr = a.make_value_read(); + execute(&recorder, vec![&a_vr], &ctx).await; + + // validate the ValueRead result + let a_res = a_vr.into_result().unwrap(); + let a_err = a_res.unwrap_err(); + assert_eq!(a_err.kind(), std::io::ErrorKind::UnexpectedEof); + + // validate the IO pattern + let recorded = recorder.recorded.borrow(); + assert_eq!(recorded.len(), 1); + let RecordedRead { pos, req_len, res } = &recorded[0]; + assert_eq!(*pos, DIO_CHUNK_SIZE as u32); + assert_eq!(*req_len, DIO_CHUNK_SIZE); + assert_eq!(res, &file.content[DIO_CHUNK_SIZE..(written as usize)]); + } + + the_impl(-1).await; // start == length - 1 + the_impl(0).await; // start == length + the_impl(1).await; // start == length + 1 + } + + // TODO: mixed: some valid, some UnexpectedEof + + // TODO: same tests but with merges }