From 70bf2e088d791463aae77a6422ed4e7bc929e058 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Fri, 4 Jul 2025 15:49:04 +0300 Subject: [PATCH] Request multiple block numbers in a single GetPageV request That's how it was always intended to be used --- .../src/worker_process/main_loop.rs | 127 +++++++++--------- 1 file changed, 66 insertions(+), 61 deletions(-) diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index 6d72a62131..a96ea04706 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -504,31 +504,36 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { .max() .unwrap(); - // TODO: Use batched protocol - for (blkno, _lsn, dest, _guard) in cache_misses.iter() { - let read_lsn = self.request_lsns(not_modified_since); - match self - .client - .get_page(page_api::GetPageRequest { - request_id: self.next_request_id.fetch_add(1, Ordering::Relaxed), - request_class: page_api::GetPageClass::Normal, - read_lsn, - rel, - block_numbers: vec![*blkno], - }) - .await - { - Ok(resp) => { - // Write the received page image directly to the shared memory location - // that the backend requested. - if resp.page_images.len() != 1 { - error!( - "received unexpected response with {} page images received from pageserver for a request for one page", - resp.page_images.len() - ); - return Err(-1); - } - let page_image = resp.page_images[0].clone(); + // Form a pageserver request for the cache misses + let read_lsn = self.request_lsns(not_modified_since); + + let block_numbers: Vec = cache_misses.iter().map(|(blkno, _lsn, _dest, _guard)| *blkno).collect(); + info!( + "sending getpage request for blocks {:?} in rel {:?} lsns {}", + block_numbers, rel, read_lsn + ); + match self + .client + .get_page(page_api::GetPageRequest { + request_id: self.next_request_id.fetch_add(1, Ordering::Relaxed), + request_class: page_api::GetPageClass::Normal, + read_lsn, + rel, + block_numbers: block_numbers.clone(), + }) + .await + { + Ok(resp) => { + // Write the received page images directly to the shared memory location + // that the backend requested. + if resp.page_images.len() != block_numbers.len() { + error!( + "received unexpected response with {} page images from pageserver for a request for {} pages", + resp.page_images.len(), block_numbers.len(), + ); + return Err(-1); + } + for (page_image, (blkno, _lsn, dest, _guard)) in resp.page_images.into_iter().zip(cache_misses) { let src: &[u8] = page_image.as_ref(); let len = std::cmp::min(src.len(), dest.bytes_total()); unsafe { @@ -539,17 +544,17 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { self.cache .remember_page( &rel, - *blkno, + blkno, page_image, read_lsn.not_modified_since_lsn.unwrap(), false, ) .await; } - Err(err) => { - info!("tonic error: {err:?}"); - return Err(-1); - } + }, + Err(err) => { + info!("tonic error: {err:?}"); + return Err(-1); } } Ok(()) @@ -588,44 +593,44 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { .map(|(_blkno, lsn, _guard)| *lsn) .max() .unwrap(); + let block_numbers: Vec = cache_misses.iter().map(|(blkno, _lsn, _guard)| *blkno).collect(); // TODO: spawn separate tasks for these. Use the integrated cache to keep track of the // in-flight requests - // TODO: Use batched protocol - for (blkno, _lsn, _guard) in cache_misses.iter() { - match self - .client - .get_page(page_api::GetPageRequest { - request_id: self.next_request_id.fetch_add(1, Ordering::Relaxed), - request_class: page_api::GetPageClass::Prefetch, - read_lsn: self.request_lsns(not_modified_since), - rel, - block_numbers: vec![*blkno], - }) - .await - { - Ok(resp) => { - trace!( - "prefetch completed, remembering blk {} in rel {:?} in LFC", - *blkno, rel + match self + .client + .get_page(page_api::GetPageRequest { + request_id: self.next_request_id.fetch_add(1, Ordering::Relaxed), + request_class: page_api::GetPageClass::Prefetch, + read_lsn: self.request_lsns(not_modified_since), + rel, + block_numbers: block_numbers.clone(), + }) + .await + { + Ok(resp) => { + trace!( + "prefetch completed, remembering blocks {:?} in rel {:?} in LFC", + block_numbers, rel + ); + if resp.page_images.len() != block_numbers.len() { + error!( + "received unexpected response with {} page images from pageserver for a request for {} pages", + resp.page_images.len(), block_numbers.len(), ); - if resp.page_images.len() != 1 { - error!( - "received unexpected response with {} page images received from pageserver for a request for one page", - resp.page_images.len() - ); - return Err(-1); - } - let page_image = resp.page_images[0].clone(); - self.cache - .remember_page(&rel, *blkno, page_image, not_modified_since, false) - .await; - } - Err(err) => { - info!("tonic error: {err:?}"); return Err(-1); } + + for (page_image, (blkno, _lsn, _guard)) in resp.page_images.into_iter().zip(cache_misses) { + self.cache + .remember_page(&rel, blkno, page_image, not_modified_since, false) + .await; + } + }, + Err(err) => { + info!("tonic error: {err:?}"); + return Err(-1); } } Ok(())