diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index b7132ee3bf..466d95f46d 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -47,6 +47,7 @@ use anyhow::{anyhow, bail, ensure, Context, Result}; use bytes::BytesMut; use camino::{Utf8Path, Utf8PathBuf}; use futures::StreamExt; +use itertools::Itertools; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::LayerAccessKind; use pageserver_api::shard::TenantShardId; @@ -946,6 +947,34 @@ impl DeltaLayerInner { Ok(planner.finish()) } + fn get_min_read_buffer_size( + planned_reads: &[VectoredRead], + read_size_soft_max: usize, + ) -> usize { + let Some(largest_read) = planned_reads.iter().max_by_key(|read| read.size()) else { + return read_size_soft_max; + }; + + let largest_read_size = largest_read.size(); + if largest_read_size > read_size_soft_max { + // If the read is oversized, it should only contain one key. + let offenders = largest_read + .blobs_at + .as_slice() + .iter() + .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn)) + .join(", "); + tracing::warn!( + "Oversized vectored read ({} > {}) for keys {}", + largest_read_size, + read_size_soft_max, + offenders + ); + } + + largest_read_size + } + async fn do_reads_and_update_state( &self, reads: Vec, @@ -959,7 +988,8 @@ impl DeltaLayerInner { .expect("Layer is loaded with max vectored bytes config") .0 .into(); - let mut buf = Some(BytesMut::with_capacity(max_vectored_read_bytes)); + let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes); + let mut buf = Some(BytesMut::with_capacity(buf_size)); // Note that reads are processed in reverse order (from highest key+lsn). // This is the order that `ReconstructState` requires such that it can @@ -986,7 +1016,7 @@ impl DeltaLayerInner { // We have "lost" the buffer since the lower level IO api // doesn't return the buffer on error. Allocate a new one. - buf = Some(BytesMut::with_capacity(max_vectored_read_bytes)); + buf = Some(BytesMut::with_capacity(buf_size)); continue; } @@ -1210,9 +1240,16 @@ impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for Del mod test { use std::collections::BTreeMap; + use itertools::MinMaxResult; + use rand::prelude::{SeedableRng, SliceRandom, StdRng}; + use rand::RngCore; + use super::*; use crate::{ - context::DownloadBehavior, task_mgr::TaskKind, tenant::disk_btree::tests::TestDisk, + context::DownloadBehavior, + task_mgr::TaskKind, + tenant::{disk_btree::tests::TestDisk, harness::TenantHarness}, + DEFAULT_PG_VERSION, }; /// Construct an index for a fictional delta layer and and then @@ -1332,4 +1369,229 @@ mod test { assert_eq!(planned_blobs, expected_blobs); } + + mod constants { + use utils::lsn::Lsn; + + /// Offset used by all lsns in this test + pub(super) const LSN_OFFSET: Lsn = Lsn(0x08); + /// Number of unique keys including in the test data + pub(super) const KEY_COUNT: u8 = 60; + /// Max number of different lsns for each key + pub(super) const MAX_ENTRIES_PER_KEY: u8 = 20; + /// Possible value sizes for each key along with a probability weight + pub(super) const VALUE_SIZES: [(usize, u8); 3] = [(100, 2), (1024, 2), (1024 * 1024, 1)]; + /// Probability that there will be a gap between the current key and the next one (33.3%) + pub(super) const KEY_GAP_CHANGES: [(bool, u8); 2] = [(true, 1), (false, 2)]; + /// The minimum size of a key range in all the generated reads + pub(super) const MIN_RANGE_SIZE: i128 = 10; + /// The number of ranges included in each vectored read + pub(super) const RANGES_COUNT: u8 = 2; + /// The number of vectored reads performed + pub(super) const READS_COUNT: u8 = 100; + /// Soft max size of a vectored read. Will be violated if we have to read keys + /// with values larger than the limit + pub(super) const MAX_VECTORED_READ_BYTES: usize = 64 * 1024; + } + + struct Entry { + key: Key, + lsn: Lsn, + value: Vec, + } + + fn generate_entries(rng: &mut StdRng) -> Vec { + let mut current_key = Key::MIN; + + let mut entries = Vec::new(); + for _ in 0..constants::KEY_COUNT { + let count = rng.gen_range(1..constants::MAX_ENTRIES_PER_KEY); + let mut lsns_iter = + std::iter::successors(Some(Lsn(constants::LSN_OFFSET.0 + 0x08)), |lsn| { + Some(Lsn(lsn.0 + 0x08)) + }); + let mut lsns = Vec::new(); + while lsns.len() < count as usize { + let take = rng.gen_bool(0.5); + let lsn = lsns_iter.next().unwrap(); + if take { + lsns.push(lsn); + } + } + + for lsn in lsns { + let size = constants::VALUE_SIZES + .choose_weighted(rng, |item| item.1) + .unwrap() + .0; + let mut buf = vec![0; size]; + rng.fill_bytes(&mut buf); + + entries.push(Entry { + key: current_key, + lsn, + value: buf, + }) + } + + let gap = constants::KEY_GAP_CHANGES + .choose_weighted(rng, |item| item.1) + .unwrap() + .0; + if gap { + current_key = current_key.add(2); + } else { + current_key = current_key.add(1); + } + } + + entries + } + + struct EntriesMeta { + key_range: Range, + lsn_range: Range, + index: BTreeMap<(Key, Lsn), Vec>, + } + + fn get_entries_meta(entries: &[Entry]) -> EntriesMeta { + let key_range = match entries.iter().minmax_by_key(|e| e.key) { + MinMaxResult::MinMax(min, max) => min.key..max.key.next(), + _ => panic!("More than one entry is always expected"), + }; + + let lsn_range = match entries.iter().minmax_by_key(|e| e.lsn) { + MinMaxResult::MinMax(min, max) => min.lsn..Lsn(max.lsn.0 + 1), + _ => panic!("More than one entry is always expected"), + }; + + let mut index = BTreeMap::new(); + for entry in entries.iter() { + index.insert((entry.key, entry.lsn), entry.value.clone()); + } + + EntriesMeta { + key_range, + lsn_range, + index, + } + } + + fn pick_random_keyspace(rng: &mut StdRng, key_range: &Range) -> KeySpace { + let start = key_range.start.to_i128(); + let end = key_range.end.to_i128(); + + let mut keyspace = KeySpace::default(); + + for _ in 0..constants::RANGES_COUNT { + let mut range: Option> = Option::default(); + while range.is_none() || keyspace.overlaps(range.as_ref().unwrap()) { + let range_start = rng.gen_range(start..end); + let range_end_offset = range_start + constants::MIN_RANGE_SIZE; + if range_end_offset >= end { + range = Some(Key::from_i128(range_start)..Key::from_i128(end)); + } else { + let range_end = rng.gen_range((range_start + constants::MIN_RANGE_SIZE)..end); + range = Some(Key::from_i128(range_start)..Key::from_i128(range_end)); + } + } + keyspace.ranges.push(range.unwrap()); + } + + keyspace + } + + #[tokio::test] + async fn test_delta_layer_vectored_read_end_to_end() -> anyhow::Result<()> { + let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read")?; + let (tenant, ctx) = harness.load().await; + + let timeline_id = TimelineId::generate(); + let timeline = tenant + .create_test_timeline(timeline_id, constants::LSN_OFFSET, DEFAULT_PG_VERSION, &ctx) + .await?; + + tracing::info!("Generating test data ..."); + + let rng = &mut StdRng::seed_from_u64(0); + let entries = generate_entries(rng); + let entries_meta = get_entries_meta(&entries); + + tracing::info!("Done generating {} entries", entries.len()); + + tracing::info!("Writing test data to delta layer ..."); + let mut writer = DeltaLayerWriter::new( + harness.conf, + timeline_id, + harness.tenant_shard_id, + entries_meta.key_range.start, + entries_meta.lsn_range.clone(), + ) + .await?; + + for entry in entries { + let (_, res) = writer + .put_value_bytes(entry.key, entry.lsn, entry.value, false) + .await; + res?; + } + + let resident = writer.finish(entries_meta.key_range.end, &timeline).await?; + + let inner = resident.get_inner_delta(&ctx).await?; + + let file_size = inner.file.metadata().await?.len(); + tracing::info!( + "Done writing test data to delta layer. Resulting file size is: {}", + file_size + ); + + for i in 0..constants::READS_COUNT { + tracing::info!("Doing vectored read {}/{}", i + 1, constants::READS_COUNT); + + let block_reader = FileBlockReader::new(&inner.file, inner.file_id); + let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( + inner.index_start_blk, + inner.index_root_blk, + block_reader, + ); + + let planner = VectoredReadPlanner::new(constants::MAX_VECTORED_READ_BYTES); + let mut reconstruct_state = ValuesReconstructState::new(); + let keyspace = pick_random_keyspace(rng, &entries_meta.key_range); + let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64; + + let vectored_reads = DeltaLayerInner::plan_reads( + keyspace.clone(), + entries_meta.lsn_range.clone(), + data_end_offset, + index_reader, + planner, + &mut reconstruct_state, + &ctx, + ) + .await?; + + let vectored_blob_reader = VectoredBlobReader::new(&inner.file); + let buf_size = DeltaLayerInner::get_min_read_buffer_size( + &vectored_reads, + constants::MAX_VECTORED_READ_BYTES, + ); + let mut buf = Some(BytesMut::with_capacity(buf_size)); + + for read in vectored_reads { + let blobs_buf = vectored_blob_reader + .read_blobs(&read, buf.take().expect("Should have a buffer")) + .await?; + for meta in blobs_buf.blobs.iter() { + let value = &blobs_buf.buf[meta.start..meta.end]; + assert_eq!(value, entries_meta.index[&(meta.meta.key, meta.meta.lsn)]); + } + + buf = Some(blobs_buf.buf); + } + } + + Ok(()) + } } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 14c79e413c..5b44d2bc2c 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -44,6 +44,7 @@ use anyhow::{anyhow, bail, ensure, Context, Result}; use bytes::{Bytes, BytesMut}; use camino::{Utf8Path, Utf8PathBuf}; use hex; +use itertools::Itertools; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::LayerAccessKind; use pageserver_api::shard::TenantShardId; @@ -540,7 +541,25 @@ impl ImageLayerInner { let vectored_blob_reader = VectoredBlobReader::new(&self.file); for read in reads.into_iter() { - let buf = BytesMut::with_capacity(max_vectored_read_bytes); + let buf_size = read.size(); + + if buf_size > max_vectored_read_bytes { + // If the read is oversized, it should only contain one key. + let offenders = read + .blobs_at + .as_slice() + .iter() + .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn)) + .join(", "); + tracing::warn!( + "Oversized vectored read ({} > {}) for keys {}", + buf_size, + max_vectored_read_bytes, + offenders + ); + } + + let buf = BytesMut::with_capacity(buf_size); let res = vectored_blob_reader.read_blobs(&read, buf).await; match res { diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 8ba37b5a86..27e60f783c 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -1759,6 +1759,18 @@ impl ResidentLayer { pub(crate) fn metadata(&self) -> LayerFileMetadata { self.owner.metadata() } + + #[cfg(test)] + pub(crate) async fn get_inner_delta<'a>( + &'a self, + ctx: &RequestContext, + ) -> anyhow::Result<&'a delta_layer::DeltaLayerInner> { + let owner = &self.owner.0; + match self.downloaded.get(owner, ctx).await? { + LayerKind::Delta(d) => Ok(d), + LayerKind::Image(_) => Err(anyhow::anyhow!("Expected a delta layer")), + } + } } impl AsLayerDesc for ResidentLayer { diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index 805f70b23b..3a6950cf88 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -61,7 +61,7 @@ pub struct VectoredRead { } impl VectoredRead { - fn size(&self) -> usize { + pub fn size(&self) -> usize { (self.end - self.start) as usize } }