mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 09:52:54 +00:00
pageserver: fix oversized key on vectored read (#7259)
## Problem During this week's deployment we observed panics due to the blobs for certain keys not fitting in the vectored read buffers. The likely cause of this is a bloated AUX_FILE_KEY caused by logical replication. ## Summary of changes This pr fixes the issue by allocating a buffer big enough to fit the widest read. It also has the benefit of saving space if all keys in the read have blobs smaller than the max vectored read size. If the soft limit for the max size of a vectored read is violated, we print a warning which includes the offending key and lsn. A randomised (but deterministic) end to end test is also added for vectored reads on the delta layer.
This commit is contained in:
@@ -47,6 +47,7 @@ use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::BytesMut;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use futures::StreamExt;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
@@ -946,6 +947,34 @@ impl DeltaLayerInner {
|
||||
Ok(planner.finish())
|
||||
}
|
||||
|
||||
fn get_min_read_buffer_size(
|
||||
planned_reads: &[VectoredRead],
|
||||
read_size_soft_max: usize,
|
||||
) -> usize {
|
||||
let Some(largest_read) = planned_reads.iter().max_by_key(|read| read.size()) else {
|
||||
return read_size_soft_max;
|
||||
};
|
||||
|
||||
let largest_read_size = largest_read.size();
|
||||
if largest_read_size > read_size_soft_max {
|
||||
// If the read is oversized, it should only contain one key.
|
||||
let offenders = largest_read
|
||||
.blobs_at
|
||||
.as_slice()
|
||||
.iter()
|
||||
.map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
|
||||
.join(", ");
|
||||
tracing::warn!(
|
||||
"Oversized vectored read ({} > {}) for keys {}",
|
||||
largest_read_size,
|
||||
read_size_soft_max,
|
||||
offenders
|
||||
);
|
||||
}
|
||||
|
||||
largest_read_size
|
||||
}
|
||||
|
||||
async fn do_reads_and_update_state(
|
||||
&self,
|
||||
reads: Vec<VectoredRead>,
|
||||
@@ -959,7 +988,8 @@ impl DeltaLayerInner {
|
||||
.expect("Layer is loaded with max vectored bytes config")
|
||||
.0
|
||||
.into();
|
||||
let mut buf = Some(BytesMut::with_capacity(max_vectored_read_bytes));
|
||||
let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
|
||||
let mut buf = Some(BytesMut::with_capacity(buf_size));
|
||||
|
||||
// Note that reads are processed in reverse order (from highest key+lsn).
|
||||
// This is the order that `ReconstructState` requires such that it can
|
||||
@@ -986,7 +1016,7 @@ impl DeltaLayerInner {
|
||||
|
||||
// We have "lost" the buffer since the lower level IO api
|
||||
// doesn't return the buffer on error. Allocate a new one.
|
||||
buf = Some(BytesMut::with_capacity(max_vectored_read_bytes));
|
||||
buf = Some(BytesMut::with_capacity(buf_size));
|
||||
|
||||
continue;
|
||||
}
|
||||
@@ -1210,9 +1240,16 @@ impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for Del
|
||||
mod test {
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use itertools::MinMaxResult;
|
||||
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
|
||||
use rand::RngCore;
|
||||
|
||||
use super::*;
|
||||
use crate::{
|
||||
context::DownloadBehavior, task_mgr::TaskKind, tenant::disk_btree::tests::TestDisk,
|
||||
context::DownloadBehavior,
|
||||
task_mgr::TaskKind,
|
||||
tenant::{disk_btree::tests::TestDisk, harness::TenantHarness},
|
||||
DEFAULT_PG_VERSION,
|
||||
};
|
||||
|
||||
/// Construct an index for a fictional delta layer and and then
|
||||
@@ -1332,4 +1369,229 @@ mod test {
|
||||
|
||||
assert_eq!(planned_blobs, expected_blobs);
|
||||
}
|
||||
|
||||
mod constants {
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
/// Offset used by all lsns in this test
|
||||
pub(super) const LSN_OFFSET: Lsn = Lsn(0x08);
|
||||
/// Number of unique keys including in the test data
|
||||
pub(super) const KEY_COUNT: u8 = 60;
|
||||
/// Max number of different lsns for each key
|
||||
pub(super) const MAX_ENTRIES_PER_KEY: u8 = 20;
|
||||
/// Possible value sizes for each key along with a probability weight
|
||||
pub(super) const VALUE_SIZES: [(usize, u8); 3] = [(100, 2), (1024, 2), (1024 * 1024, 1)];
|
||||
/// Probability that there will be a gap between the current key and the next one (33.3%)
|
||||
pub(super) const KEY_GAP_CHANGES: [(bool, u8); 2] = [(true, 1), (false, 2)];
|
||||
/// The minimum size of a key range in all the generated reads
|
||||
pub(super) const MIN_RANGE_SIZE: i128 = 10;
|
||||
/// The number of ranges included in each vectored read
|
||||
pub(super) const RANGES_COUNT: u8 = 2;
|
||||
/// The number of vectored reads performed
|
||||
pub(super) const READS_COUNT: u8 = 100;
|
||||
/// Soft max size of a vectored read. Will be violated if we have to read keys
|
||||
/// with values larger than the limit
|
||||
pub(super) const MAX_VECTORED_READ_BYTES: usize = 64 * 1024;
|
||||
}
|
||||
|
||||
struct Entry {
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
value: Vec<u8>,
|
||||
}
|
||||
|
||||
fn generate_entries(rng: &mut StdRng) -> Vec<Entry> {
|
||||
let mut current_key = Key::MIN;
|
||||
|
||||
let mut entries = Vec::new();
|
||||
for _ in 0..constants::KEY_COUNT {
|
||||
let count = rng.gen_range(1..constants::MAX_ENTRIES_PER_KEY);
|
||||
let mut lsns_iter =
|
||||
std::iter::successors(Some(Lsn(constants::LSN_OFFSET.0 + 0x08)), |lsn| {
|
||||
Some(Lsn(lsn.0 + 0x08))
|
||||
});
|
||||
let mut lsns = Vec::new();
|
||||
while lsns.len() < count as usize {
|
||||
let take = rng.gen_bool(0.5);
|
||||
let lsn = lsns_iter.next().unwrap();
|
||||
if take {
|
||||
lsns.push(lsn);
|
||||
}
|
||||
}
|
||||
|
||||
for lsn in lsns {
|
||||
let size = constants::VALUE_SIZES
|
||||
.choose_weighted(rng, |item| item.1)
|
||||
.unwrap()
|
||||
.0;
|
||||
let mut buf = vec![0; size];
|
||||
rng.fill_bytes(&mut buf);
|
||||
|
||||
entries.push(Entry {
|
||||
key: current_key,
|
||||
lsn,
|
||||
value: buf,
|
||||
})
|
||||
}
|
||||
|
||||
let gap = constants::KEY_GAP_CHANGES
|
||||
.choose_weighted(rng, |item| item.1)
|
||||
.unwrap()
|
||||
.0;
|
||||
if gap {
|
||||
current_key = current_key.add(2);
|
||||
} else {
|
||||
current_key = current_key.add(1);
|
||||
}
|
||||
}
|
||||
|
||||
entries
|
||||
}
|
||||
|
||||
struct EntriesMeta {
|
||||
key_range: Range<Key>,
|
||||
lsn_range: Range<Lsn>,
|
||||
index: BTreeMap<(Key, Lsn), Vec<u8>>,
|
||||
}
|
||||
|
||||
fn get_entries_meta(entries: &[Entry]) -> EntriesMeta {
|
||||
let key_range = match entries.iter().minmax_by_key(|e| e.key) {
|
||||
MinMaxResult::MinMax(min, max) => min.key..max.key.next(),
|
||||
_ => panic!("More than one entry is always expected"),
|
||||
};
|
||||
|
||||
let lsn_range = match entries.iter().minmax_by_key(|e| e.lsn) {
|
||||
MinMaxResult::MinMax(min, max) => min.lsn..Lsn(max.lsn.0 + 1),
|
||||
_ => panic!("More than one entry is always expected"),
|
||||
};
|
||||
|
||||
let mut index = BTreeMap::new();
|
||||
for entry in entries.iter() {
|
||||
index.insert((entry.key, entry.lsn), entry.value.clone());
|
||||
}
|
||||
|
||||
EntriesMeta {
|
||||
key_range,
|
||||
lsn_range,
|
||||
index,
|
||||
}
|
||||
}
|
||||
|
||||
fn pick_random_keyspace(rng: &mut StdRng, key_range: &Range<Key>) -> KeySpace {
|
||||
let start = key_range.start.to_i128();
|
||||
let end = key_range.end.to_i128();
|
||||
|
||||
let mut keyspace = KeySpace::default();
|
||||
|
||||
for _ in 0..constants::RANGES_COUNT {
|
||||
let mut range: Option<Range<Key>> = Option::default();
|
||||
while range.is_none() || keyspace.overlaps(range.as_ref().unwrap()) {
|
||||
let range_start = rng.gen_range(start..end);
|
||||
let range_end_offset = range_start + constants::MIN_RANGE_SIZE;
|
||||
if range_end_offset >= end {
|
||||
range = Some(Key::from_i128(range_start)..Key::from_i128(end));
|
||||
} else {
|
||||
let range_end = rng.gen_range((range_start + constants::MIN_RANGE_SIZE)..end);
|
||||
range = Some(Key::from_i128(range_start)..Key::from_i128(range_end));
|
||||
}
|
||||
}
|
||||
keyspace.ranges.push(range.unwrap());
|
||||
}
|
||||
|
||||
keyspace
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delta_layer_vectored_read_end_to_end() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let timeline_id = TimelineId::generate();
|
||||
let timeline = tenant
|
||||
.create_test_timeline(timeline_id, constants::LSN_OFFSET, DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
tracing::info!("Generating test data ...");
|
||||
|
||||
let rng = &mut StdRng::seed_from_u64(0);
|
||||
let entries = generate_entries(rng);
|
||||
let entries_meta = get_entries_meta(&entries);
|
||||
|
||||
tracing::info!("Done generating {} entries", entries.len());
|
||||
|
||||
tracing::info!("Writing test data to delta layer ...");
|
||||
let mut writer = DeltaLayerWriter::new(
|
||||
harness.conf,
|
||||
timeline_id,
|
||||
harness.tenant_shard_id,
|
||||
entries_meta.key_range.start,
|
||||
entries_meta.lsn_range.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
for entry in entries {
|
||||
let (_, res) = writer
|
||||
.put_value_bytes(entry.key, entry.lsn, entry.value, false)
|
||||
.await;
|
||||
res?;
|
||||
}
|
||||
|
||||
let resident = writer.finish(entries_meta.key_range.end, &timeline).await?;
|
||||
|
||||
let inner = resident.get_inner_delta(&ctx).await?;
|
||||
|
||||
let file_size = inner.file.metadata().await?.len();
|
||||
tracing::info!(
|
||||
"Done writing test data to delta layer. Resulting file size is: {}",
|
||||
file_size
|
||||
);
|
||||
|
||||
for i in 0..constants::READS_COUNT {
|
||||
tracing::info!("Doing vectored read {}/{}", i + 1, constants::READS_COUNT);
|
||||
|
||||
let block_reader = FileBlockReader::new(&inner.file, inner.file_id);
|
||||
let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
inner.index_start_blk,
|
||||
inner.index_root_blk,
|
||||
block_reader,
|
||||
);
|
||||
|
||||
let planner = VectoredReadPlanner::new(constants::MAX_VECTORED_READ_BYTES);
|
||||
let mut reconstruct_state = ValuesReconstructState::new();
|
||||
let keyspace = pick_random_keyspace(rng, &entries_meta.key_range);
|
||||
let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64;
|
||||
|
||||
let vectored_reads = DeltaLayerInner::plan_reads(
|
||||
keyspace.clone(),
|
||||
entries_meta.lsn_range.clone(),
|
||||
data_end_offset,
|
||||
index_reader,
|
||||
planner,
|
||||
&mut reconstruct_state,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let vectored_blob_reader = VectoredBlobReader::new(&inner.file);
|
||||
let buf_size = DeltaLayerInner::get_min_read_buffer_size(
|
||||
&vectored_reads,
|
||||
constants::MAX_VECTORED_READ_BYTES,
|
||||
);
|
||||
let mut buf = Some(BytesMut::with_capacity(buf_size));
|
||||
|
||||
for read in vectored_reads {
|
||||
let blobs_buf = vectored_blob_reader
|
||||
.read_blobs(&read, buf.take().expect("Should have a buffer"))
|
||||
.await?;
|
||||
for meta in blobs_buf.blobs.iter() {
|
||||
let value = &blobs_buf.buf[meta.start..meta.end];
|
||||
assert_eq!(value, entries_meta.index[&(meta.meta.key, meta.meta.lsn)]);
|
||||
}
|
||||
|
||||
buf = Some(blobs_buf.buf);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,6 +44,7 @@ use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use hex;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
@@ -540,7 +541,25 @@ impl ImageLayerInner {
|
||||
|
||||
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
|
||||
for read in reads.into_iter() {
|
||||
let buf = BytesMut::with_capacity(max_vectored_read_bytes);
|
||||
let buf_size = read.size();
|
||||
|
||||
if buf_size > max_vectored_read_bytes {
|
||||
// If the read is oversized, it should only contain one key.
|
||||
let offenders = read
|
||||
.blobs_at
|
||||
.as_slice()
|
||||
.iter()
|
||||
.map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
|
||||
.join(", ");
|
||||
tracing::warn!(
|
||||
"Oversized vectored read ({} > {}) for keys {}",
|
||||
buf_size,
|
||||
max_vectored_read_bytes,
|
||||
offenders
|
||||
);
|
||||
}
|
||||
|
||||
let buf = BytesMut::with_capacity(buf_size);
|
||||
let res = vectored_blob_reader.read_blobs(&read, buf).await;
|
||||
|
||||
match res {
|
||||
|
||||
@@ -1759,6 +1759,18 @@ impl ResidentLayer {
|
||||
pub(crate) fn metadata(&self) -> LayerFileMetadata {
|
||||
self.owner.metadata()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) async fn get_inner_delta<'a>(
|
||||
&'a self,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<&'a delta_layer::DeltaLayerInner> {
|
||||
let owner = &self.owner.0;
|
||||
match self.downloaded.get(owner, ctx).await? {
|
||||
LayerKind::Delta(d) => Ok(d),
|
||||
LayerKind::Image(_) => Err(anyhow::anyhow!("Expected a delta layer")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AsLayerDesc for ResidentLayer {
|
||||
|
||||
@@ -61,7 +61,7 @@ pub struct VectoredRead {
|
||||
}
|
||||
|
||||
impl VectoredRead {
|
||||
fn size(&self) -> usize {
|
||||
pub fn size(&self) -> usize {
|
||||
(self.end - self.start) as usize
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user