Request multiple block numbers in a single GetPageV request

That's how it was always intended to be used
This commit is contained in:
Heikki Linnakangas
2025-07-04 15:49:04 +03:00
parent da3f9ee72d
commit 70bf2e088d

View File

@@ -504,31 +504,36 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
.max()
.unwrap();
// TODO: Use batched protocol
for (blkno, _lsn, dest, _guard) in cache_misses.iter() {
let read_lsn = self.request_lsns(not_modified_since);
match self
.client
.get_page(page_api::GetPageRequest {
request_id: self.next_request_id.fetch_add(1, Ordering::Relaxed),
request_class: page_api::GetPageClass::Normal,
read_lsn,
rel,
block_numbers: vec![*blkno],
})
.await
{
Ok(resp) => {
// Write the received page image directly to the shared memory location
// that the backend requested.
if resp.page_images.len() != 1 {
error!(
"received unexpected response with {} page images received from pageserver for a request for one page",
resp.page_images.len()
);
return Err(-1);
}
let page_image = resp.page_images[0].clone();
// Form a pageserver request for the cache misses
let read_lsn = self.request_lsns(not_modified_since);
let block_numbers: Vec<u32> = cache_misses.iter().map(|(blkno, _lsn, _dest, _guard)| *blkno).collect();
info!(
"sending getpage request for blocks {:?} in rel {:?} lsns {}",
block_numbers, rel, read_lsn
);
match self
.client
.get_page(page_api::GetPageRequest {
request_id: self.next_request_id.fetch_add(1, Ordering::Relaxed),
request_class: page_api::GetPageClass::Normal,
read_lsn,
rel,
block_numbers: block_numbers.clone(),
})
.await
{
Ok(resp) => {
// Write the received page images directly to the shared memory location
// that the backend requested.
if resp.page_images.len() != block_numbers.len() {
error!(
"received unexpected response with {} page images from pageserver for a request for {} pages",
resp.page_images.len(), block_numbers.len(),
);
return Err(-1);
}
for (page_image, (blkno, _lsn, dest, _guard)) in resp.page_images.into_iter().zip(cache_misses) {
let src: &[u8] = page_image.as_ref();
let len = std::cmp::min(src.len(), dest.bytes_total());
unsafe {
@@ -539,17 +544,17 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
self.cache
.remember_page(
&rel,
*blkno,
blkno,
page_image,
read_lsn.not_modified_since_lsn.unwrap(),
false,
)
.await;
}
Err(err) => {
info!("tonic error: {err:?}");
return Err(-1);
}
},
Err(err) => {
info!("tonic error: {err:?}");
return Err(-1);
}
}
Ok(())
@@ -588,44 +593,44 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
.map(|(_blkno, lsn, _guard)| *lsn)
.max()
.unwrap();
let block_numbers: Vec<u32> = cache_misses.iter().map(|(blkno, _lsn, _guard)| *blkno).collect();
// TODO: spawn separate tasks for these. Use the integrated cache to keep track of the
// in-flight requests
// TODO: Use batched protocol
for (blkno, _lsn, _guard) in cache_misses.iter() {
match self
.client
.get_page(page_api::GetPageRequest {
request_id: self.next_request_id.fetch_add(1, Ordering::Relaxed),
request_class: page_api::GetPageClass::Prefetch,
read_lsn: self.request_lsns(not_modified_since),
rel,
block_numbers: vec![*blkno],
})
.await
{
Ok(resp) => {
trace!(
"prefetch completed, remembering blk {} in rel {:?} in LFC",
*blkno, rel
match self
.client
.get_page(page_api::GetPageRequest {
request_id: self.next_request_id.fetch_add(1, Ordering::Relaxed),
request_class: page_api::GetPageClass::Prefetch,
read_lsn: self.request_lsns(not_modified_since),
rel,
block_numbers: block_numbers.clone(),
})
.await
{
Ok(resp) => {
trace!(
"prefetch completed, remembering blocks {:?} in rel {:?} in LFC",
block_numbers, rel
);
if resp.page_images.len() != block_numbers.len() {
error!(
"received unexpected response with {} page images from pageserver for a request for {} pages",
resp.page_images.len(), block_numbers.len(),
);
if resp.page_images.len() != 1 {
error!(
"received unexpected response with {} page images received from pageserver for a request for one page",
resp.page_images.len()
);
return Err(-1);
}
let page_image = resp.page_images[0].clone();
self.cache
.remember_page(&rel, *blkno, page_image, not_modified_since, false)
.await;
}
Err(err) => {
info!("tonic error: {err:?}");
return Err(-1);
}
for (page_image, (blkno, _lsn, _guard)) in resp.page_images.into_iter().zip(cache_misses) {
self.cache
.remember_page(&rel, blkno, page_image, not_modified_since, false)
.await;
}
},
Err(err) => {
info!("tonic error: {err:?}");
return Err(-1);
}
}
Ok(())