mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-18 13:40:37 +00:00
add two recorder tests (more to come)
This commit is contained in:
@@ -56,6 +56,17 @@ pub trait Buffer: sealed::Sealed + std::ops::Deref<Target = [u8]> {
|
||||
/// The minimum alignment and size requirement for disk offsets and memory buffer size for direct IO.
|
||||
const DIO_CHUNK_SIZE: usize = 512;
|
||||
|
||||
/// If multiple chunks need to be read, merge adjacent chunk reads into batches of max size `MAX_CHUNK_BATCH_SIZE`.
|
||||
/// (The unit is the number of chunks.)
|
||||
const MAX_CHUNK_BATCH_SIZE: usize = {
|
||||
let desired = 128 * 1024; // 128k
|
||||
if desired % DIO_CHUNK_SIZE != 0 {
|
||||
panic!("MAX_CHUNK_BATCH_SIZE must be a multiple of DIO_CHUNK_SIZE")
|
||||
// compile-time error
|
||||
}
|
||||
desired / DIO_CHUNK_SIZE
|
||||
};
|
||||
|
||||
/// Execute the given `reads` against `file`.
|
||||
/// The results are placed in the buffers of the [`ValueRead`]s.
|
||||
/// Retrieve the results by calling [`ValueRead::into_result`] on each [`ValueRead`].
|
||||
@@ -65,8 +76,8 @@ const DIO_CHUNK_SIZE: usize = 512;
|
||||
/// TODO: prevent this through type system.
|
||||
pub async fn execute<'a, I, F, B>(file: &F, reads: I, ctx: &RequestContext)
|
||||
where
|
||||
I: IntoIterator<Item = &'a ValueRead<B>> + Send,
|
||||
F: File + Send,
|
||||
I: IntoIterator<Item = &'a ValueRead<B>>,
|
||||
F: File,
|
||||
B: Buffer + IoBufMut + Send,
|
||||
{
|
||||
// Plan which parts of which chunks need to be appended to which buffer
|
||||
@@ -111,15 +122,6 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// Merge adjacent chunk reads (merging pass on the BTreeMap iterator)
|
||||
const MAX_CHUNK_BATCH_SIZE: usize = {
|
||||
let desired = 128 * 1024; // 128k
|
||||
if desired % DIO_CHUNK_SIZE != 0 {
|
||||
panic!("MAX_CHUNK_BATCH_SIZE must be a multiple of DIO_CHUNK_SIZE")
|
||||
// compile-time error
|
||||
}
|
||||
desired / DIO_CHUNK_SIZE
|
||||
};
|
||||
struct MergedRead<'a, B: Buffer> {
|
||||
start_chunk_no: u32,
|
||||
nchunks: u32,
|
||||
@@ -319,7 +321,7 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
struct TestValueRead {
|
||||
pos: u32,
|
||||
expected_result: Vec<u8>,
|
||||
@@ -382,6 +384,15 @@ mod tests {
|
||||
expected: RefCell<VecDeque<ExpectedRead>>,
|
||||
}
|
||||
|
||||
impl Drop for MockFile {
|
||||
fn drop(&mut self) {
|
||||
assert!(
|
||||
self.expected.borrow().is_empty(),
|
||||
"expected reads not satisfied"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! mock_file {
|
||||
($($pos:expr , $len:expr => $respond:expr),* $(,)?) => {{
|
||||
MockFile {
|
||||
@@ -470,4 +481,112 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(err.to_string(), "foo");
|
||||
}
|
||||
|
||||
struct RecorderFile<'a> {
|
||||
recorded: RefCell<Vec<RecordedRead>>,
|
||||
file: &'a InMemoryFile,
|
||||
}
|
||||
|
||||
struct RecordedRead {
|
||||
pos: u32,
|
||||
req_len: usize,
|
||||
res: Vec<u8>,
|
||||
}
|
||||
|
||||
impl<'a> RecorderFile<'a> {
|
||||
fn new(file: &'a InMemoryFile) -> RecorderFile<'a> {
|
||||
Self {
|
||||
recorded: Default::default(),
|
||||
file,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'x> File for RecorderFile<'x> {
|
||||
async fn read_at_to_end<'a, 'b, B: IoBufMut + Send>(
|
||||
&'b self,
|
||||
start: u32,
|
||||
dst: Slice<B>,
|
||||
ctx: &'a RequestContext,
|
||||
) -> std::io::Result<(Slice<B>, usize)> {
|
||||
let (dst, nread) = self.file.read_at_to_end(start, dst, ctx).await?;
|
||||
self.recorded.borrow_mut().push(RecordedRead {
|
||||
pos: start,
|
||||
req_len: dst.bytes_total(),
|
||||
res: Vec::from(&dst[..nread]),
|
||||
});
|
||||
Ok((dst, nread))
|
||||
}
|
||||
}
|
||||
|
||||
async fn execute_and_validate_test_value_reads<I, F>(
|
||||
file: &F,
|
||||
test_value_reads: I,
|
||||
ctx: &RequestContext,
|
||||
) where
|
||||
I: IntoIterator<Item = TestValueRead>,
|
||||
F: File,
|
||||
{
|
||||
let (tmp, test_value_reads) = test_value_reads.into_iter().tee();
|
||||
let value_reads = tmp.map(|tr| tr.make_value_read()).collect::<Vec<_>>();
|
||||
execute(file, value_reads.iter(), &ctx).await;
|
||||
for (value_read, test_value_read) in value_reads.into_iter().zip(test_value_reads) {
|
||||
let res = value_read
|
||||
.into_result()
|
||||
.expect("InMemoryFile is infallible");
|
||||
assert_eq!(res, test_value_read.expected_result);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_value_reads_to_same_chunk_are_merged_into_one_chunk_read() {
|
||||
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
|
||||
|
||||
let file = InMemoryFile::new_random(2 * DIO_CHUNK_SIZE);
|
||||
|
||||
let a = file.test_value_read(DIO_CHUNK_SIZE as u32, 10);
|
||||
let b = file.test_value_read(DIO_CHUNK_SIZE as u32 + 30, 20);
|
||||
|
||||
let recorder = RecorderFile::new(&file);
|
||||
|
||||
execute_and_validate_test_value_reads(&recorder, vec![a, b], &ctx).await;
|
||||
|
||||
let recorded = recorder.recorded.borrow();
|
||||
assert_eq!(recorded.len(), 1);
|
||||
let RecordedRead { pos, req_len, .. } = &recorded[0];
|
||||
assert_eq!(*pos, DIO_CHUNK_SIZE as u32);
|
||||
assert_eq!(*req_len, DIO_CHUNK_SIZE);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_max_chunk_batch_size_is_respected() {
|
||||
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
|
||||
|
||||
let file = InMemoryFile::new_random(4 * MAX_CHUNK_BATCH_SIZE * DIO_CHUNK_SIZE);
|
||||
|
||||
// read the 10th byte of each chunk 3 .. 3+2*MAX_CHUNK_BATCH_SIZE
|
||||
assert!(3 < MAX_CHUNK_BATCH_SIZE, "test assumption");
|
||||
assert!(10 < DIO_CHUNK_SIZE, "test assumption");
|
||||
let mut test_value_reads = Vec::new();
|
||||
for i in 3..3 + MAX_CHUNK_BATCH_SIZE + MAX_CHUNK_BATCH_SIZE / 2 {
|
||||
test_value_reads.push(file.test_value_read(i as u32 * DIO_CHUNK_SIZE as u32 + 10, 1));
|
||||
}
|
||||
|
||||
let recorder = RecorderFile::new(&file);
|
||||
|
||||
execute_and_validate_test_value_reads(&recorder, test_value_reads, &ctx).await;
|
||||
|
||||
let recorded = recorder.recorded.borrow();
|
||||
assert_eq!(recorded.len(), 2);
|
||||
{
|
||||
let RecordedRead { pos, req_len, .. } = &recorded[0];
|
||||
assert_eq!(*pos as usize, 3 * DIO_CHUNK_SIZE);
|
||||
assert_eq!(*req_len, MAX_CHUNK_BATCH_SIZE * DIO_CHUNK_SIZE);
|
||||
}
|
||||
{
|
||||
let RecordedRead { pos, req_len, .. } = &recorded[1];
|
||||
assert_eq!(*pos as usize, (3 + MAX_CHUNK_BATCH_SIZE) * DIO_CHUNK_SIZE);
|
||||
assert_eq!(*req_len, MAX_CHUNK_BATCH_SIZE / 2 * DIO_CHUNK_SIZE);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user