merging of adjacent chunk reads, up to max batch size

This commit is contained in:
Christian Schwarz
2024-08-15 10:29:20 +00:00
parent 6f65b4d2d3
commit fb78185074

View File

@@ -15,6 +15,7 @@ use crate::{l0_flush, page_cache};
use anyhow::{anyhow, Result};
use bytes::Bytes;
use camino::Utf8PathBuf;
use itertools::Itertools;
use pageserver_api::key::CompactKey;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::InMemoryLayerInfo;
@@ -319,7 +320,7 @@ impl InMemoryLayer {
const DIO_CHUNK_SIZE: usize = 512;
// Plan which parts of which pages need to be appended to which value_buf
// Plan which parts of which chunks need to be appended to which value_buf
struct ChunkReadDestination<'a> {
value_read: &'a ValueRead,
offset_in_chunk: u32,
@@ -348,28 +349,99 @@ impl InMemoryLayer {
}
}
// TODO: merge adjacent chunk reads (merging pass on the BTreeMap iterator)
// Merge adjacent chunk reads (merging pass on the BTreeMap iterator)
const MAX_CHUNK_BATCH_SIZE: usize = {
let desired = 128 * 1024; // 128k
if desired % DIO_CHUNK_SIZE != 0 {
panic!("MAX_CHUNK_BATCH_SIZE must be a multiple of DIO_CHUNK_SIZE")
// compile-time error
}
desired / DIO_CHUNK_SIZE
};
struct MergedRead<'a> {
start_chunk_no: u32,
nchunks: u32,
dsts: Vec<MergedChunkReadDestination<'a>>,
}
struct MergedChunkReadDestination<'a> {
value_read: &'a ValueRead,
offset_in_merged_read: u32,
len: u32,
}
let mut merged_reads: Vec<MergedRead> = Vec::new();
let mut chunk_reads = chunk_reads.into_iter().peekable();
loop {
let mut last_chunk_no = None;
let to_merge: Vec<(u32, Vec<ChunkReadDestination>)> = chunk_reads
.peeking_take_while(|(chunk_no, _)| {
if let Some(last_chunk_no) = last_chunk_no {
if *chunk_no != last_chunk_no + 1 {
return false;
}
}
last_chunk_no = Some(*chunk_no);
true
})
.take(MAX_CHUNK_BATCH_SIZE)
.collect(); // TODO: avoid this .collect()
let Some(start_chunk_no) = to_merge.first().map(|(chunk_no, _)| *chunk_no) else {
break;
};
let nchunks = to_merge.len() as u32;
let dsts = to_merge
.into_iter()
.enumerate()
.flat_map(|(i, (_, dsts))| {
dsts.into_iter().map(
move |ChunkReadDestination {
value_read,
offset_in_chunk,
len,
}| {
MergedChunkReadDestination {
value_read,
offset_in_merged_read: i as u32 * DIO_CHUNK_SIZE as u32
+ offset_in_chunk,
len,
}
},
)
})
.collect();
merged_reads.push(MergedRead {
start_chunk_no,
nchunks,
dsts,
});
}
drop(chunk_reads);
// Execute reads and fill the destination
// TODO: prefetch
let get_chunk_buf = || Vec::with_capacity(DIO_CHUNK_SIZE);
let mut chunk_buf = get_chunk_buf();
for (chunk_no, dsts) in chunk_reads.into_iter() {
let all_done = dsts.iter().all(|ChunkReadDestination { value_read, .. }| {
let value_buf = value_read.value_buf.lock().unwrap();
let Ok(buf) = &*value_buf else {
return true; // on Err() there's no need to read more
};
buf.len() == value_read.len as usize
});
let get_chunk_buf = |nchunks| Vec::with_capacity(nchunks as usize * (DIO_CHUNK_SIZE));
for MergedRead {
start_chunk_no,
nchunks,
dsts,
} in merged_reads
{
let all_done = dsts
.iter()
.all(|MergedChunkReadDestination { value_read, .. }| {
let value_buf = value_read.value_buf.lock().unwrap();
let Ok(buf) = &*value_buf else {
return true; // on Err() there's no need to read more
};
buf.len() == value_read.len as usize
});
if all_done {
continue;
}
let (tmp, nread) = match inner
let (merged_read_buf_slice, nread) = match inner
.file
.read_at_to_end(
chunk_no * DIO_CHUNK_SIZE as u32,
chunk_buf.slice_full(),
start_chunk_no * DIO_CHUNK_SIZE as u32,
get_chunk_buf(nchunks).slice_full(),
&ctx,
)
.await
@@ -377,30 +449,30 @@ impl InMemoryLayer {
Ok(t) => t,
Err(e) => {
let e = Arc::new(e);
for ChunkReadDestination { value_read, .. } in dsts {
for MergedChunkReadDestination { value_read, .. } in dsts {
*value_read.value_buf.lock().unwrap() = Err(Arc::clone(&e));
// this will make later reads short-circuit, see top of loop body
}
chunk_buf = get_chunk_buf(); // TODO: change API to return the buffer back on error
continue;
}
};
chunk_buf = tmp.into_inner();
let contents = &chunk_buf[..nread];
for ChunkReadDestination {
let merged_read_buf = merged_read_buf_slice.into_inner();
assert_eq!(nread, merged_read_buf.len());
let merged_read_buf = &merged_read_buf[..nread];
for MergedChunkReadDestination {
value_read,
offset_in_chunk,
offset_in_merged_read,
len,
} in dsts
{
if let Ok(buf) = &mut *value_read.value_buf.lock().unwrap() {
buf.extend_from_slice(
&contents[offset_in_chunk as usize..(offset_in_chunk + len) as usize],
&merged_read_buf[offset_in_merged_read as usize
..(offset_in_merged_read + len) as usize],
);
}
}
}
drop(chunk_buf);
// Process results into the reconstruct state
'next_key: for (key, value_reads) in reads {