diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index b867cb0333..cde736ed9d 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -34,11 +34,14 @@ use crate::tenant::storage_layer::{ LayerAccessStats, ValueReconstructResult, ValueReconstructState, }; use crate::tenant::timeline::GetVectoredError; +use crate::tenant::vectored_blob_io::{ + BlobFlag, VectoredBlobReader, VectoredRead, VectoredReadPlanner, +}; use crate::tenant::{PageReconstructError, Timeline}; use crate::virtual_file::{self, VirtualFile}; use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; use anyhow::{anyhow, bail, ensure, Context, Result}; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use camino::{Utf8Path, Utf8PathBuf}; use hex; use pageserver_api::keyspace::KeySpace; @@ -154,6 +157,7 @@ pub struct ImageLayerInner { /// Reader object for reading blocks from the file. file: FileBlockReader, + vectored_blob_reader: VectoredBlobReader, } impl std::fmt::Debug for ImageLayerInner { @@ -210,7 +214,7 @@ impl ImageLayer { return Ok(()); } - let inner = self.load(LayerAccessKind::Dump, ctx).await?; + let inner = self.load(LayerAccessKind::Dump, 0, ctx).await?; inner.dump(ctx).await?; @@ -240,21 +244,32 @@ impl ImageLayer { async fn load( &self, access_kind: LayerAccessKind, + max_vectored_read_size: usize, ctx: &RequestContext, ) -> Result<&ImageLayerInner> { self.access_stats.record_access(access_kind, ctx); self.inner - .get_or_try_init(|| self.load_inner(ctx)) + .get_or_try_init(|| self.load_inner(max_vectored_read_size, ctx)) .await .with_context(|| format!("Failed to load image layer {}", self.path())) } - async fn load_inner(&self, ctx: &RequestContext) -> Result { + async fn load_inner( + &self, + max_vectored_read_size: usize, + ctx: &RequestContext, + ) -> Result { let path = self.path(); - let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, ctx) - .await - .and_then(|res| res)?; + let loaded = ImageLayerInner::load( + &path, + self.desc.image_layer_lsn(), + None, + max_vectored_read_size, + ctx, + ) + .await + .and_then(|res| res)?; // not production code let actual_filename = path.file_name().unwrap().to_owned(); @@ -361,14 +376,15 @@ impl ImageLayerInner { path: &Utf8Path, lsn: Lsn, summary: Option, + max_vectored_read_size: usize, ctx: &RequestContext, ) -> Result, anyhow::Error> { let file = match VirtualFile::open(path).await { Ok(file) => file, Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))), }; - let file = FileBlockReader::new(file); - let summary_blk = match file.read_blk(0, ctx).await { + let block_reader = FileBlockReader::new(file); + let summary_blk = match block_reader.read_blk(0, ctx).await { Ok(blk) => blk, Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))), }; @@ -394,11 +410,19 @@ impl ImageLayerInner { } } + // TODO: don't open file twice + let file = match VirtualFile::open(path).await { + Ok(file) => file, + Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))), + }; + let vectored_blob_reader = VectoredBlobReader::new(file, max_vectored_read_size); + Ok(Ok(ImageLayerInner { index_start_blk: actual_summary.index_start_blk, index_root_blk: actual_summary.index_root_blk, lsn, - file, + file: block_reader, + vectored_blob_reader, })) } @@ -449,12 +473,30 @@ impl ImageLayerInner { reconstruct_state: &mut ValuesReconstructState, ctx: &RequestContext, ) -> Result<(), GetVectoredError> { + let reads = self + .plan_reads(keyspace, ctx) + .await + .map_err(GetVectoredError::Other)?; + + self.do_reads_and_update_state(reads, reconstruct_state) + .await; + + Ok(()) + } + + async fn plan_reads( + &self, + keyspace: KeySpace, + ctx: &RequestContext, + ) -> anyhow::Result> { + let mut planner = VectoredReadPlanner::new(self.vectored_blob_reader.get_max_read_size()); + let file = &self.file; let tree_reader = DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, file); - let mut offsets = Vec::new(); - for range in keyspace.ranges.iter() { + let mut range_end_handled = false; + let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; range.start.write_to_byte_slice(&mut search_key); @@ -462,17 +504,18 @@ impl ImageLayerInner { .visit( &search_key, VisitDirection::Forwards, - |raw_key, value| { + |raw_key, offset| { let key = Key::from_slice(&raw_key[..KEY_SIZE]); assert!(key >= range.start); - if !range.contains(&key) { - return false; + if key >= range.end { + planner.handle_range_end(offset); + range_end_handled = true; + false + } else { + planner.handle(key, self.lsn, offset, BlobFlag::None); + true } - - offsets.push((key, value)); - - true }, &RequestContextBuilder::extend(ctx) .page_content_kind(PageContentKind::ImageLayerBtreeNode) @@ -480,33 +523,64 @@ impl ImageLayerInner { ) .await .map_err(|err| GetVectoredError::Other(anyhow!(err)))?; - } - let ctx = &RequestContextBuilder::extend(ctx) - .page_content_kind(PageContentKind::ImageLayerValue) - .build(); - - let cursor = file.block_cursor(); - let mut buf = Vec::new(); - for (key, offset) in offsets { - let res = cursor.read_blob_into_buf(offset, &mut buf, ctx).await; - if let Err(e) = res { - reconstruct_state.on_key_error( - key, - PageReconstructError::from(anyhow!(e).context(format!( - "Failed to read blob from virtual file {}", - file.file.path - ))), - ); - - continue; + if !range_end_handled { + let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64; + planner.handle_range_end(payload_end); } - - let blob = Bytes::copy_from_slice(buf.as_slice()); - reconstruct_state.update_key(&key, self.lsn, Value::Image(blob)); } - Ok(()) + Ok(planner.finish()) + } + + async fn do_reads_and_update_state( + &self, + reads: Vec, + reconstruct_state: &mut ValuesReconstructState, + ) { + let mut buf = Some(BytesMut::with_capacity( + self.vectored_blob_reader.get_max_read_size(), + )); + for read in reads.into_iter().rev() { + let res = self + .vectored_blob_reader + .read_blobs(&read, buf.take().expect("Should have a buffer")) + .await; + + match res { + Ok(blobs_buf) => { + for meta in blobs_buf.blobs.iter().rev() { + let img_buf = Bytes::copy_from_slice(&blobs_buf.buf[meta.start..meta.end]); + reconstruct_state.update_key( + &meta.meta.key, + self.lsn, + Value::Image(img_buf), + ); + } + + buf = Some(blobs_buf.buf); + } + Err(err) => { + let kind = err.kind(); + for (_, blob_meta) in read.blobs_at.as_slice() { + reconstruct_state.on_key_error( + blob_meta.key, + PageReconstructError::from(anyhow!( + "Failed to read blobs from virtual file {}: {}", + self.vectored_blob_reader.get_file_ref().path, + kind + )), + ); + } + + // We have "lost" the buffer since the lower level IO api + // doesn't return the buffer on error. Allocate a new one. + buf = Some(BytesMut::with_capacity( + self.vectored_blob_reader.get_max_read_size(), + )); + } + }; + } } } diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index cc5b7ade6a..7d5c5a7514 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -1310,9 +1310,15 @@ impl DownloadedLayer { owner.desc.key_range.clone(), lsn, )); - image_layer::ImageLayerInner::load(&owner.path, lsn, summary, ctx) - .await - .map(|res| res.map(LayerKind::Image)) + image_layer::ImageLayerInner::load( + &owner.path, + lsn, + summary, + owner.conf.max_vectored_read_size, + ctx, + ) + .await + .map(|res| res.map(LayerKind::Image)) }; match res {