diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 73c018db31..55f5bdd140 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -329,10 +329,20 @@ impl ReadableLayerDesc { ctx: &RequestContext, ) -> Result<(), GetVectoredError> { match self { - ReadableLayerDesc::Persistent { desc, lsn_ceil, .. } => { + ReadableLayerDesc::Persistent { + desc, + lsn_floor, + lsn_ceil, + } => { let layer = layer_manager.get_from_desc(desc); layer - .get_values_reconstruct_data(keyspace, *lsn_ceil, reconstruct_state, ctx) + .get_values_reconstruct_data( + keyspace, + *lsn_floor, + *lsn_ceil, + reconstruct_state, + ctx, + ) .await } ReadableLayerDesc::InMemory { handle, lsn_ceil } => { diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 19eebf5531..dfb4b7bc64 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -36,18 +36,21 @@ use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, Fi use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; use crate::tenant::timeline::GetVectoredError; +use crate::tenant::vectored_blob_io::{ + BlobFlag, VectoredBlobReader, VectoredRead, VectoredReadPlanner, +}; use crate::tenant::{PageReconstructError, Timeline}; use crate::virtual_file::{self, VirtualFile}; use crate::{walrecord, TEMP_FILE_SUFFIX}; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; use anyhow::{anyhow, bail, ensure, Context, Result}; +use bytes::BytesMut; use camino::{Utf8Path, Utf8PathBuf}; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::LayerAccessKind; use pageserver_api::shard::TenantShardId; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; -use std::collections::BTreeMap; use std::fs::File; use std::io::SeekFrom; use std::ops::Range; @@ -63,8 +66,7 @@ use utils::{ }; use super::{ - AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer, ValueReconstructSituation, - ValuesReconstructState, + AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer, ValuesReconstructState, }; /// @@ -213,6 +215,7 @@ pub struct DeltaLayerInner { // values copied from summary index_start_blk: u32, index_root_blk: u32, + vectored_blob_reader: VectoredBlobReader, /// Reader object for reading blocks from the file. file: FileBlockReader, @@ -248,7 +251,7 @@ impl DeltaLayer { return Ok(()); } - let inner = self.load(LayerAccessKind::Dump, ctx).await?; + let inner = self.load(LayerAccessKind::Dump, 0, ctx).await?; inner.dump(ctx).await } @@ -284,20 +287,25 @@ impl DeltaLayer { async fn load( &self, access_kind: LayerAccessKind, + max_vectored_read_size: usize, ctx: &RequestContext, ) -> Result<&Arc> { self.access_stats.record_access(access_kind, ctx); // Quick exit if already loaded self.inner - .get_or_try_init(|| self.load_inner(ctx)) + .get_or_try_init(|| self.load_inner(max_vectored_read_size, ctx)) .await .with_context(|| format!("Failed to load delta layer {}", self.path())) } - async fn load_inner(&self, ctx: &RequestContext) -> Result> { + async fn load_inner( + &self, + max_vectored_read_size: usize, + ctx: &RequestContext, + ) -> Result> { let path = self.path(); - let loaded = DeltaLayerInner::load(&path, None, ctx) + let loaded = DeltaLayerInner::load(&path, None, max_vectored_read_size, ctx) .await .and_then(|res| res)?; @@ -698,15 +706,16 @@ impl DeltaLayerInner { pub(super) async fn load( path: &Utf8Path, summary: Option, + max_vectored_read_size: usize, ctx: &RequestContext, ) -> Result, anyhow::Error> { let file = match VirtualFile::open(path).await { Ok(file) => file, Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))), }; - let file = FileBlockReader::new(file); + let block_reader = FileBlockReader::new(file); - let summary_blk = match file.read_blk(0, ctx).await { + let summary_blk = match block_reader.read_blk(0, ctx).await { Ok(blk) => blk, Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))), }; @@ -728,8 +737,16 @@ impl DeltaLayerInner { } } + // TODO: don't open file twice + let file = match VirtualFile::open(path).await { + Ok(file) => file, + Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))), + }; + let vectored_blob_reader = VectoredBlobReader::new(file, max_vectored_read_size); + Ok(Ok(DeltaLayerInner { - file, + file: block_reader, + vectored_blob_reader, index_start_blk: actual_summary.index_start_blk, index_root_blk: actual_summary.index_root_blk, })) @@ -834,10 +851,31 @@ impl DeltaLayerInner { pub(super) async fn get_values_reconstruct_data( &self, keyspace: KeySpace, + start_lsn: Lsn, end_lsn: Lsn, reconstruct_state: &mut ValuesReconstructState, ctx: &RequestContext, ) -> Result<(), GetVectoredError> { + let reads = self + .plan_reads(keyspace, start_lsn..end_lsn, reconstruct_state, ctx) + .await + .map_err(GetVectoredError::Other)?; + + self.do_reads_and_update_state(reads, reconstruct_state) + .await; + + Ok(()) + } + + async fn plan_reads( + &self, + keyspace: KeySpace, + lsn_range: Range, + reconstruct_state: &mut ValuesReconstructState, + ctx: &RequestContext, + ) -> anyhow::Result> { + let mut planner = VectoredReadPlanner::new(self.vectored_blob_reader.get_max_read_size()); + let file = &self.file; let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( self.index_start_blk, @@ -845,110 +883,130 @@ impl DeltaLayerInner { file, ); - let mut offsets: BTreeMap> = BTreeMap::new(); - for range in keyspace.ranges.iter() { - let mut ignore_key = None; + let mut range_end_handled = false; - // Scan the page versions backwards, starting from the last key in the range. - // to collect all the offsets at which need to be read. - let end_key = DeltaKey::from_key_lsn(&range.end, Lsn(end_lsn.0 - 1)); + let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start); tree_reader .visit( - &end_key.0, - VisitDirection::Backwards, + &start_key.0, + VisitDirection::Forwards, |raw_key, value| { let key = Key::from_slice(&raw_key[..KEY_SIZE]); - let entry_lsn = DeltaKey::extract_lsn_from_buf(raw_key); - - if entry_lsn >= end_lsn { - return true; - } - - if key < range.start { - return false; - } - - if key >= range.end { - return true; - } - - if Some(key) == ignore_key { - return true; - } - - if let Some(cached_lsn) = reconstruct_state.get_cached_lsn(&key) { - if entry_lsn <= cached_lsn { - return key != range.start; - } - } - + let lsn = DeltaKey::extract_lsn_from_buf(raw_key); let blob_ref = BlobRef(value); - let lsns_at = offsets.entry(key).or_default(); - lsns_at.push((entry_lsn, blob_ref.pos())); - if blob_ref.will_init() { - if key == range.start { - return false; + assert!(key >= range.start && lsn >= lsn_range.start); + + let cached_lsn = reconstruct_state.get_cached_lsn(&key); + let flag = { + if cached_lsn >= Some(lsn) { + BlobFlag::Ignore + } else if blob_ref.will_init() { + BlobFlag::Replaces } else { - ignore_key = Some(key); - return true; + BlobFlag::None } - } + }; - true + if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) { + planner.handle_range_end(blob_ref.pos()); + range_end_handled = true; + false + } else { + planner.handle(key, lsn, blob_ref.pos(), flag); + true + } }, &RequestContextBuilder::extend(ctx) .page_content_kind(PageContentKind::DeltaLayerBtreeNode) .build(), ) .await - .map_err(|err| GetVectoredError::Other(anyhow!(err)))?; - } + .map_err(|err| anyhow!(err))?; - let ctx = &RequestContextBuilder::extend(ctx) - .page_content_kind(PageContentKind::DeltaLayerValue) - .build(); - - let cursor = file.block_cursor(); - let mut buf = Vec::new(); - for (key, lsns_at) in offsets { - for (lsn, block_offset) in lsns_at { - let res = cursor.read_blob_into_buf(block_offset, &mut buf, ctx).await; - - if let Err(e) = res { - reconstruct_state.on_key_error( - key, - PageReconstructError::from(anyhow!(e).context(format!( - "Failed to read blob from virtual file {}", - file.file.path - ))), - ); - - break; - } - - let value = Value::des(&buf); - if let Err(e) = value { - reconstruct_state.on_key_error( - key, - PageReconstructError::from(anyhow!(e).context(format!( - "Failed to deserialize file blob from virtual file {}", - file.file.path - ))), - ); - - break; - } - - let key_situation = reconstruct_state.update_key(&key, lsn, value.unwrap()); - if key_situation == ValueReconstructSituation::Complete { - break; - } + if !range_end_handled { + let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64; + tracing::info!("Handling range end fallback at {}", payload_end); + planner.handle_range_end(payload_end); } } - Ok(()) + Ok(planner.finish()) + } + + async fn do_reads_and_update_state( + &self, + reads: Vec, + reconstruct_state: &mut ValuesReconstructState, + ) { + let mut ignore_key_with_err = None; + + let mut buf = Some(BytesMut::with_capacity( + self.vectored_blob_reader.get_max_read_size(), + )); + + for read in reads.into_iter().rev() { + let res = self + .vectored_blob_reader + .read_blobs(&read, buf.take().expect("Should have a buffer")) + .await; + + let blobs_buf = match res { + Ok(blobs_buf) => blobs_buf, + Err(err) => { + let kind = err.kind(); + for (_, blob_meta) in read.blobs_at.as_slice() { + reconstruct_state.on_key_error( + blob_meta.key, + PageReconstructError::from(anyhow!( + "Failed to read blobs from virtual file {}: {}", + self.vectored_blob_reader.get_file_ref().path, + kind + )), + ); + } + + // We have "lost" the buffer since the lower level IO api + // doesn't return the buffer on error. Allocate a new one. + buf = Some(BytesMut::with_capacity( + self.vectored_blob_reader.get_max_read_size(), + )); + + continue; + } + }; + + for meta in blobs_buf.blobs.iter().rev() { + if Some(meta.meta.key) == ignore_key_with_err { + continue; + } + + let value = Value::des(&blobs_buf.buf[meta.start..meta.end]); + let value = match value { + Ok(v) => v, + Err(e) => { + reconstruct_state.on_key_error( + meta.meta.key, + PageReconstructError::from(anyhow!(e).context(format!( + "Failed to deserialize blob from virtual file {}", + self.vectored_blob_reader.get_file_ref().path, + ))), + ); + + ignore_key_with_err = Some(meta.meta.key); + continue; + } + }; + + // Invariant: once a key reaches [`ValueReconstructSituation::Complete`] + // state, no further updates shall be made to it. The call below will + // panic if the invariant is violated. + reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value); + } + + buf = Some(blobs_buf.buf); + } } pub(super) async fn load_keys<'a>( diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 7d5c5a7514..5a8589d9f0 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -267,6 +267,7 @@ impl Layer { pub(crate) async fn get_values_reconstruct_data( &self, keyspace: KeySpace, + start_lsn: Lsn, end_lsn: Lsn, reconstruct_data: &mut ValuesReconstructState, ctx: &RequestContext, @@ -282,7 +283,14 @@ impl Layer { .record_access(LayerAccessKind::GetValueReconstructData, ctx); layer - .get_values_reconstruct_data(keyspace, end_lsn, reconstruct_data, &self.0, ctx) + .get_values_reconstruct_data( + keyspace, + start_lsn, + end_lsn, + reconstruct_data, + &self.0, + ctx, + ) .instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self)) .await } @@ -1299,9 +1307,14 @@ impl DownloadedLayer { owner.desc.key_range.clone(), owner.desc.lsn_range.clone(), )); - delta_layer::DeltaLayerInner::load(&owner.path, summary, ctx) - .await - .map(|res| res.map(LayerKind::Delta)) + delta_layer::DeltaLayerInner::load( + &owner.path, + summary, + owner.conf.max_vectored_read_size, + ctx, + ) + .await + .map(|res| res.map(LayerKind::Delta)) } else { let lsn = owner.desc.image_layer_lsn(); let summary = Some(image_layer::Summary::expected( @@ -1371,6 +1384,7 @@ impl DownloadedLayer { async fn get_values_reconstruct_data( &self, keyspace: KeySpace, + start_lsn: Lsn, end_lsn: Lsn, reconstruct_data: &mut ValuesReconstructState, owner: &Arc, @@ -1380,7 +1394,7 @@ impl DownloadedLayer { match self.get(owner, ctx).await.map_err(GetVectoredError::from)? { Delta(d) => { - d.get_values_reconstruct_data(keyspace, end_lsn, reconstruct_data, ctx) + d.get_values_reconstruct_data(keyspace, start_lsn, end_lsn, reconstruct_data, ctx) .await } Image(i) => {