feat(pageserver): rewrite streaming vectored read planner (#8242)

Rewrite streaming vectored read planner to be a separate struct. The API
is designed to produce batches around `max_read_size` instead of exactly
less than that so that `handle_XX` returns one batch a time.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
Alex Chi Z
2024-07-11 14:28:16 -04:00
committed by GitHub
parent cd29156927
commit 38b4ed297e
3 changed files with 218 additions and 72 deletions

View File

@@ -1321,7 +1321,7 @@ impl DeltaLayerInner {
offsets.start.pos(),
offsets.end.pos(),
meta,
Some(max_read_size),
max_read_size,
))
}
} else {
@@ -1615,13 +1615,17 @@ impl<'a> DeltaLayerIterator<'a> {
let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
let blob_ref = BlobRef(value);
let offset = blob_ref.pos();
if let Some(batch_plan) = self.planner.handle(key, lsn, offset, BlobFlag::None) {
if let Some(batch_plan) = self.planner.handle(key, lsn, offset) {
break batch_plan;
}
} else {
self.is_end = true;
let data_end_offset = self.delta_layer.index_start_offset();
break self.planner.handle_range_end(data_end_offset);
if let Some(item) = self.planner.handle_range_end(data_end_offset) {
break item;
} else {
return Ok(()); // TODO: test empty iterator
}
}
};
let vectored_blob_reader = VectoredBlobReader::new(&self.delta_layer.file);

View File

@@ -994,14 +994,17 @@ impl<'a> ImageLayerIterator<'a> {
Key::from_slice(&raw_key[..KEY_SIZE]),
self.image_layer.lsn,
offset,
BlobFlag::None,
) {
break batch_plan;
}
} else {
self.is_end = true;
let payload_end = self.image_layer.index_start_blk as u64 * PAGE_SZ as u64;
break self.planner.handle_range_end(payload_end);
if let Some(item) = self.planner.handle_range_end(payload_end) {
break item;
} else {
return Ok(()); // TODO: a test case on empty iterator
}
}
};
let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file);

View File

@@ -68,7 +68,7 @@ impl VectoredRead {
}
}
#[derive(Eq, PartialEq)]
#[derive(Eq, PartialEq, Debug)]
pub(crate) enum VectoredReadExtended {
Yes,
No,
@@ -91,7 +91,7 @@ impl VectoredReadBuilder {
start_offset: u64,
end_offset: u64,
meta: BlobMeta,
max_read_size: Option<usize>,
max_read_size: usize,
) -> Self {
let mut blobs_at = VecMap::default();
blobs_at
@@ -102,10 +102,9 @@ impl VectoredReadBuilder {
start: start_offset,
end: end_offset,
blobs_at,
max_read_size,
max_read_size: Some(max_read_size),
}
}
/// Attempt to extend the current read with a new blob if the start
/// offset matches with the current end of the vectored read
/// and the resuting size is below the max read size
@@ -164,7 +163,7 @@ pub struct VectoredReadPlanner {
// Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
prev: Option<(Key, Lsn, u64, BlobFlag)>,
max_read_size: Option<usize>,
max_read_size: usize,
}
impl VectoredReadPlanner {
@@ -172,20 +171,7 @@ impl VectoredReadPlanner {
Self {
blobs: BTreeMap::new(),
prev: None,
max_read_size: Some(max_read_size),
}
}
/// This function should *only* be used if the caller has a way to control the limit. e.g., in [`StreamingVectoredReadPlanner`],
/// it uses the vectored read planner to avoid duplicated logic on handling blob start/end, while expecting the vectored
/// read planner to give a single read to a continuous range of bytes in the image layer. Therefore, it does not need the
/// code path to split reads into chunks of `max_read_size`, and controls the read size itself.
#[cfg(test)]
pub(crate) fn new_caller_controlled_max_limit() -> Self {
Self {
blobs: BTreeMap::new(),
prev: None,
max_read_size: None,
max_read_size,
}
}
@@ -376,17 +362,18 @@ impl<'a> VectoredBlobReader<'a> {
}
/// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`]. It provides a streaming API for
/// getting read blobs. It returns a batch when `handle` gets called and when the current key would exceed the read_size and
/// max_cnt constraints. Underlying it uses [`VectoredReadPlanner`].
/// getting read blobs. It returns a batch when `handle` gets called and when the current key would just exceed the read_size and
/// max_cnt constraints.
#[cfg(test)]
pub struct StreamingVectoredReadPlanner {
planner: VectoredReadPlanner,
/// Max read size per batch
read_builder: Option<VectoredReadBuilder>,
// Arguments for previous blob passed into [`StreamingVectoredReadPlanner::handle`]
prev: Option<(Key, Lsn, u64)>,
/// Max read size per batch. This is not a strict limit. If there are [0, 100) and [100, 200), while the `max_read_size` is 150,
/// we will produce a single batch instead of split them.
max_read_size: u64,
/// Max item count per batch
max_cnt: usize,
/// The first offset of this batch
this_batch_first_offset: Option<u64>,
/// Size of the current batch
cnt: usize,
}
@@ -397,62 +384,88 @@ impl StreamingVectoredReadPlanner {
assert!(max_cnt > 0);
assert!(max_read_size > 0);
Self {
// We want to have exactly one read syscall (plus several others for index lookup) for each `next_batch` call.
// Therefore, we enforce `self.max_read_size` by ourselves instead of using the VectoredReadPlanner's capability,
// to avoid splitting into two I/Os.
planner: VectoredReadPlanner::new_caller_controlled_max_limit(),
read_builder: None,
prev: None,
max_cnt,
max_read_size,
this_batch_first_offset: None,
cnt: 0,
}
}
fn emit(&mut self, this_batch_first_offset: u64) -> VectoredRead {
let planner = std::mem::replace(
&mut self.planner,
VectoredReadPlanner::new_caller_controlled_max_limit(),
);
self.this_batch_first_offset = Some(this_batch_first_offset);
self.cnt = 1;
let mut batch = planner.finish();
assert_eq!(batch.len(), 1, "should have exactly one read batch");
batch.pop().unwrap()
pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64) -> Option<VectoredRead> {
// Implementation note: internally lag behind by one blob such that
// we have a start and end offset when initialising [`VectoredRead`]
let (prev_key, prev_lsn, prev_offset) = match self.prev {
None => {
self.prev = Some((key, lsn, offset));
return None;
}
Some(prev) => prev,
};
let res = self.add_blob(prev_key, prev_lsn, prev_offset, offset, false);
self.prev = Some((key, lsn, offset));
res
}
pub fn handle(
pub fn handle_range_end(&mut self, offset: u64) -> Option<VectoredRead> {
let res = if let Some((prev_key, prev_lsn, prev_offset)) = self.prev {
self.add_blob(prev_key, prev_lsn, prev_offset, offset, true)
} else {
None
};
self.prev = None;
res
}
fn add_blob(
&mut self,
key: Key,
lsn: Lsn,
offset: u64,
flag: BlobFlag,
start_offset: u64,
end_offset: u64,
is_last_blob_in_read: bool,
) -> Option<VectoredRead> {
if let Some(begin_offset) = self.this_batch_first_offset {
// Each batch will have at least one item b/c `self.this_batch_first_offset` is set
// after one item gets processed
if offset - begin_offset > self.max_read_size {
self.planner.handle_range_end(offset); // End the current batch with the offset
let batch = self.emit(offset); // Produce a batch
self.planner.handle(key, lsn, offset, flag); // Add this key to the next batch
return Some(batch);
match &mut self.read_builder {
Some(read_builder) => {
let extended = read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn });
assert_eq!(extended, VectoredReadExtended::Yes);
}
} else {
self.this_batch_first_offset = Some(offset)
}
if self.cnt >= self.max_cnt {
self.planner.handle_range_end(offset); // End the current batch with the offset
let batch = self.emit(offset); // Produce a batch
self.planner.handle(key, lsn, offset, flag); // Add this key to the next batch
return Some(batch);
}
self.planner.handle(key, lsn, offset, flag); // Add this key to the current batch
self.cnt += 1;
None
}
None => {
self.read_builder = {
let mut blobs_at = VecMap::default();
blobs_at
.append(start_offset, BlobMeta { key, lsn })
.expect("First insertion always succeeds");
pub fn handle_range_end(&mut self, offset: u64) -> VectoredRead {
self.planner.handle_range_end(offset);
self.emit(offset)
Some(VectoredReadBuilder {
start: start_offset,
end: end_offset,
blobs_at,
max_read_size: None,
})
};
}
}
let read_builder = self.read_builder.as_mut().unwrap();
self.cnt += 1;
if is_last_blob_in_read
|| read_builder.size() >= self.max_read_size as usize
|| self.cnt >= self.max_cnt
{
let prev_read_builder = self.read_builder.take();
self.cnt = 0;
// `current_read_builder` is None in the first iteration
if let Some(read_builder) = prev_read_builder {
return Some(read_builder.build());
}
}
None
}
}
@@ -509,8 +522,11 @@ mod tests {
planner.handle_range_end(652 * 1024);
let reads = planner.finish();
assert_eq!(reads.len(), 6);
// TODO: could remove zero reads to produce 5 reads here
for (idx, read) in reads.iter().enumerate() {
validate_read(read, ranges[idx]);
}
@@ -548,4 +564,127 @@ mod tests {
validate_read(read, ranges[idx]);
}
}
#[test]
fn streaming_planner_max_read_size_test() {
let max_read_size = 128 * 1024;
let key = Key::MIN;
let lsn = Lsn(0);
let blob_descriptions = vec![
(key, lsn, 0, BlobFlag::None),
(key, lsn, 32 * 1024, BlobFlag::None),
(key, lsn, 96 * 1024, BlobFlag::None),
(key, lsn, 128 * 1024, BlobFlag::None),
(key, lsn, 198 * 1024, BlobFlag::None),
(key, lsn, 268 * 1024, BlobFlag::None),
(key, lsn, 396 * 1024, BlobFlag::None),
(key, lsn, 652 * 1024, BlobFlag::None),
];
let ranges = [
&blob_descriptions[0..3],
&blob_descriptions[3..5],
&blob_descriptions[5..6],
&blob_descriptions[6..7],
&blob_descriptions[7..],
];
let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1000);
let mut reads = Vec::new();
for (key, lsn, offset, _) in blob_descriptions.clone() {
reads.extend(planner.handle(key, lsn, offset));
}
reads.extend(planner.handle_range_end(652 * 1024));
assert_eq!(reads.len(), ranges.len());
for (idx, read) in reads.iter().enumerate() {
validate_read(read, ranges[idx]);
}
}
#[test]
fn streaming_planner_max_cnt_test() {
let max_read_size = 1024 * 1024;
let key = Key::MIN;
let lsn = Lsn(0);
let blob_descriptions = vec![
(key, lsn, 0, BlobFlag::None),
(key, lsn, 32 * 1024, BlobFlag::None),
(key, lsn, 96 * 1024, BlobFlag::None),
(key, lsn, 128 * 1024, BlobFlag::None),
(key, lsn, 198 * 1024, BlobFlag::None),
(key, lsn, 268 * 1024, BlobFlag::None),
(key, lsn, 396 * 1024, BlobFlag::None),
(key, lsn, 652 * 1024, BlobFlag::None),
];
let ranges = [
&blob_descriptions[0..2],
&blob_descriptions[2..4],
&blob_descriptions[4..6],
&blob_descriptions[6..],
];
let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
let mut reads = Vec::new();
for (key, lsn, offset, _) in blob_descriptions.clone() {
reads.extend(planner.handle(key, lsn, offset));
}
reads.extend(planner.handle_range_end(652 * 1024));
assert_eq!(reads.len(), ranges.len());
for (idx, read) in reads.iter().enumerate() {
validate_read(read, ranges[idx]);
}
}
#[test]
fn streaming_planner_edge_test() {
let max_read_size = 1024 * 1024;
let key = Key::MIN;
let lsn = Lsn(0);
{
let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
let mut reads = Vec::new();
reads.extend(planner.handle_range_end(652 * 1024));
assert!(reads.is_empty());
}
{
let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
let mut reads = Vec::new();
reads.extend(planner.handle(key, lsn, 0));
reads.extend(planner.handle_range_end(652 * 1024));
assert_eq!(reads.len(), 1);
validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
}
{
let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
let mut reads = Vec::new();
reads.extend(planner.handle(key, lsn, 0));
reads.extend(planner.handle(key, lsn, 128 * 1024));
reads.extend(planner.handle_range_end(652 * 1024));
assert_eq!(reads.len(), 2);
validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
validate_read(&reads[1], &[(key, lsn, 128 * 1024, BlobFlag::None)]);
}
{
let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
let mut reads = Vec::new();
reads.extend(planner.handle(key, lsn, 0));
reads.extend(planner.handle(key, lsn, 128 * 1024));
reads.extend(planner.handle_range_end(652 * 1024));
assert_eq!(reads.len(), 1);
validate_read(
&reads[0],
&[
(key, lsn, 0, BlobFlag::None),
(key, lsn, 128 * 1024, BlobFlag::None),
],
);
}
}
}