pageserver: use correct will_init for streaming planner

`BlobMeta::will_init` is not actually used on these code paths,
but let's be kind to future ourselves and make sure it's correct.
This commit is contained in:
Vlad Lazar
2024-10-10 19:43:12 +02:00
parent 0a840fbdbd
commit 11990ef2fe
3 changed files with 42 additions and 21 deletions

View File

@@ -1539,7 +1539,9 @@ impl<'a> DeltaLayerIterator<'a> {
let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
let blob_ref = BlobRef(value);
let offset = blob_ref.pos();
if let Some(batch_plan) = self.planner.handle(key, lsn, offset) {
if let Some(batch_plan) =
self.planner.handle(key, lsn, offset, blob_ref.will_init())
{
break batch_plan;
}
} else {

View File

@@ -1073,6 +1073,7 @@ impl<'a> ImageLayerIterator<'a> {
Key::from_slice(&raw_key[..KEY_SIZE]),
self.image_layer.lsn,
offset,
true,
) {
break batch_plan;
}

View File

@@ -539,7 +539,7 @@ impl<'a> VectoredBlobReader<'a> {
pub struct StreamingVectoredReadPlanner {
read_builder: Option<ChunkedVectoredReadBuilder>,
// Arguments for previous blob passed into [`StreamingVectoredReadPlanner::handle`]
prev: Option<(Key, Lsn, u64)>,
prev: Option<(Key, Lsn, u64, bool)>,
/// Max read size per batch. This is not a strict limit. If there are [0, 100) and [100, 200), while the `max_read_size` is 150,
/// we will produce a single batch instead of split them.
max_read_size: u64,
@@ -562,27 +562,47 @@ impl StreamingVectoredReadPlanner {
}
}
pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64) -> Option<VectoredRead> {
pub fn handle(
&mut self,
key: Key,
lsn: Lsn,
offset: u64,
will_init: bool,
) -> Option<VectoredRead> {
// Implementation note: internally lag behind by one blob such that
// we have a start and end offset when initialising [`VectoredRead`]
let (prev_key, prev_lsn, prev_offset) = match self.prev {
let (prev_key, prev_lsn, prev_offset, prev_will_init) = match self.prev {
None => {
self.prev = Some((key, lsn, offset));
self.prev = Some((key, lsn, offset, will_init));
return None;
}
Some(prev) => prev,
};
let res = self.add_blob(prev_key, prev_lsn, prev_offset, offset, false);
let res = self.add_blob(
prev_key,
prev_lsn,
prev_offset,
offset,
false,
prev_will_init,
);
self.prev = Some((key, lsn, offset));
self.prev = Some((key, lsn, offset, will_init));
res
}
pub fn handle_range_end(&mut self, offset: u64) -> Option<VectoredRead> {
let res = if let Some((prev_key, prev_lsn, prev_offset)) = self.prev {
self.add_blob(prev_key, prev_lsn, prev_offset, offset, true)
let res = if let Some((prev_key, prev_lsn, prev_offset, prev_will_init)) = self.prev {
self.add_blob(
prev_key,
prev_lsn,
prev_offset,
offset,
true,
prev_will_init,
)
} else {
None
};
@@ -599,7 +619,7 @@ impl StreamingVectoredReadPlanner {
start_offset: u64,
end_offset: u64,
is_last_blob_in_read: bool,
// destination: oneshot::Sender<Result<Bytes, std::io::Error>>,
will_init: bool,
) -> Option<VectoredRead> {
match &mut self.read_builder {
Some(read_builder) => {
@@ -609,8 +629,7 @@ impl StreamingVectoredReadPlanner {
BlobMeta {
key,
lsn,
// TODO(vlad): figure this out ...
will_init: false,
will_init,
},
);
assert_eq!(extended, VectoredReadExtended::Yes);
@@ -623,8 +642,7 @@ impl StreamingVectoredReadPlanner {
BlobMeta {
key,
lsn,
// TODO(vlad): figure this out ...
will_init: false,
will_init,
},
))
};
@@ -839,7 +857,7 @@ mod tests {
let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1000);
let mut reads = Vec::new();
for (key, lsn, offset, _) in blob_descriptions.clone() {
reads.extend(planner.handle(key, lsn, offset));
reads.extend(planner.handle(key, lsn, offset, false));
}
reads.extend(planner.handle_range_end(652 * 1024));
@@ -877,7 +895,7 @@ mod tests {
let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
let mut reads = Vec::new();
for (key, lsn, offset, _) in blob_descriptions.clone() {
reads.extend(planner.handle(key, lsn, offset));
reads.extend(planner.handle(key, lsn, offset, false));
}
reads.extend(planner.handle_range_end(652 * 1024));
@@ -902,7 +920,7 @@ mod tests {
{
let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
let mut reads = Vec::new();
reads.extend(planner.handle(key, lsn, 0));
reads.extend(planner.handle(key, lsn, 0, false));
reads.extend(planner.handle_range_end(652 * 1024));
assert_eq!(reads.len(), 1);
validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
@@ -910,8 +928,8 @@ mod tests {
{
let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
let mut reads = Vec::new();
reads.extend(planner.handle(key, lsn, 0));
reads.extend(planner.handle(key, lsn, 128 * 1024));
reads.extend(planner.handle(key, lsn, 0, false));
reads.extend(planner.handle(key, lsn, 128 * 1024, false));
reads.extend(planner.handle_range_end(652 * 1024));
assert_eq!(reads.len(), 2);
validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
@@ -920,8 +938,8 @@ mod tests {
{
let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
let mut reads = Vec::new();
reads.extend(planner.handle(key, lsn, 0));
reads.extend(planner.handle(key, lsn, 128 * 1024));
reads.extend(planner.handle(key, lsn, 0, false));
reads.extend(planner.handle(key, lsn, 128 * 1024, false));
reads.extend(planner.handle_range_end(652 * 1024));
assert_eq!(reads.len(), 1);
validate_read(